Coverage for cc_modules/cc_task.py: 38%

910 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:51 +0100

1""" 

2camcops_server/cc_modules/cc_task.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Represents CamCOPS tasks.** 

27 

28Core task export methods: 

29 

30======= ======================================================================= 

31Format Comment 

32======= ======================================================================= 

33HTML The task in a user-friendly format. 

34PDF Essentially the HTML output, but with page headers and (for clinician 

35 tasks) a signature block, and without additional HTML administrative 

36 hyperlinks. 

37XML Centres on the task with its subdata integrated. 

38TSV Tab-separated value format. 

39SQL As part of an SQL or SQLite download. 

40======= ======================================================================= 

41 

42""" 

43 

44from base64 import b64encode 

45from collections import Counter, OrderedDict 

46import datetime 

47import logging 

48import statistics 

49from typing import ( 

50 Any, 

51 Dict, 

52 Iterable, 

53 Generator, 

54 List, 

55 Optional, 

56 Set, 

57 Tuple, 

58 Type, 

59 TYPE_CHECKING, 

60 Union, 

61) 

62 

63from cardinal_pythonlib.classes import classproperty 

64from cardinal_pythonlib.datetimefunc import ( 

65 convert_datetime_to_utc, 

66 format_datetime, 

67 pendulum_to_utc_datetime_without_tz, 

68) 

69from cardinal_pythonlib.httpconst import MimeType 

70from cardinal_pythonlib.logs import BraceStyleAdapter 

71from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName 

72from cardinal_pythonlib.sqlalchemy.orm_inspect import ( 

73 gen_columns, 

74 gen_orm_classes_from_base, 

75) 

76from cardinal_pythonlib.sqlalchemy.schema import ( 

77 is_sqlatype_binary, 

78 is_sqlatype_string, 

79) 

80from cardinal_pythonlib.stringfunc import mangle_unicode_to_ascii 

81from fhirclient.models.attachment import Attachment 

82from fhirclient.models.bundle import Bundle 

83from fhirclient.models.codeableconcept import CodeableConcept 

84from fhirclient.models.coding import Coding 

85from fhirclient.models.contactpoint import ContactPoint 

86from fhirclient.models.documentreference import ( 

87 DocumentReference, 

88 DocumentReferenceContent, 

89) 

90from fhirclient.models.fhirreference import FHIRReference 

91from fhirclient.models.humanname import HumanName 

92from fhirclient.models.identifier import Identifier 

93from fhirclient.models.observation import Observation 

94from fhirclient.models.practitioner import Practitioner 

95from fhirclient.models.questionnaire import Questionnaire 

96from fhirclient.models.questionnaireresponse import QuestionnaireResponse 

97import hl7 

98from pendulum import Date as PendulumDate, DateTime as Pendulum 

99from pyramid.renderers import render 

100from semantic_version import Version 

101from sqlalchemy.ext.declarative import declared_attr 

102from sqlalchemy.orm import Mapped, mapped_column, relationship 

103from sqlalchemy.sql.expression import not_, update 

104from sqlalchemy.sql.schema import Column, Table 

105from sqlalchemy.sql.sqltypes import ( 

106 Boolean, 

107 Date as DateColType, 

108 DateTime, 

109 Float, 

110 Integer, 

111 Numeric, 

112 String, 

113 Text, 

114 Time, 

115) 

116 

117from camcops_server.cc_modules.cc_audit import audit 

118from camcops_server.cc_modules.cc_baseconstants import DOCUMENTATION_URL 

119from camcops_server.cc_modules.cc_blob import Blob, get_blob_img_html 

120from camcops_server.cc_modules.cc_cache import cache_region_static, fkg 

121from camcops_server.cc_modules.cc_constants import ( 

122 ASCII, 

123 CssClass, 

124 CSS_PAGED_MEDIA, 

125 DateFormat, 

126 FHIRConst as Fc, 

127 FileType, 

128 ERA_NOW, 

129 INVALID_VALUE, 

130 UTF8, 

131) 

132from camcops_server.cc_modules.cc_dataclasses import SummarySchemaInfo 

133from camcops_server.cc_modules.cc_db import ( 

134 GenericTabletRecordMixin, 

135 SFN_CAMCOPS_SERVER_VERSION, 

136 SFN_IS_COMPLETE, 

137 SFN_SECONDS_CREATION_TO_FIRST_FINISH, 

138 TASK_FREQUENT_FIELDS, 

139 TFN_EDITING_TIME_S, 

140 TFN_FIRSTEXIT_IS_ABORT, 

141 TFN_FIRSTEXIT_IS_FINISH, 

142 TFN_PATIENT_ID, 

143 TFN_RESPONDENT_NAME, 

144 TFN_RESPONDENT_RELATIONSHIP, 

145 TFN_WHEN_CREATED, 

146 TFN_WHEN_FIRSTEXIT, 

147) 

148from camcops_server.cc_modules.cc_exception import FhirExportException 

149from camcops_server.cc_modules.cc_fhir import ( 

150 fhir_observation_component_from_snomed, 

151 fhir_system_value, 

152 fhir_sysval_from_id, 

153 FHIRAnsweredQuestion, 

154 FHIRAnswerType, 

155 FHIRQuestionType, 

156 make_fhir_bundle_entry, 

157) 

158from camcops_server.cc_modules.cc_filename import get_export_filename 

159from camcops_server.cc_modules.cc_hl7 import make_obr_segment, make_obx_segment 

160from camcops_server.cc_modules.cc_html import ( 

161 get_present_absent_none, 

162 get_true_false_none, 

163 get_yes_no, 

164 get_yes_no_none, 

165 tr, 

166 tr_qa, 

167) 

168from camcops_server.cc_modules.cc_pdf import pdf_from_html 

169from camcops_server.cc_modules.cc_pyramid import Routes, ViewArg 

170from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions 

171from camcops_server.cc_modules.cc_snomed import SnomedLookup 

172from camcops_server.cc_modules.cc_specialnote import SpecialNote 

173from camcops_server.cc_modules.cc_sqla_coltypes import ( 

174 camcops_column, 

175 COLATTR_PERMITTED_VALUE_CHECKER, 

176 gen_ancillary_relationships, 

177 get_camcops_blob_column_attr_names, 

178 get_column_attr_names, 

179 mapped_camcops_column, 

180 PendulumDateTimeAsIsoTextColType, 

181 permitted_value_failure_msgs, 

182 permitted_values_ok, 

183 PermittedValueChecker, 

184 SemanticVersionColType, 

185 TableNameColType, 

186) 

187from camcops_server.cc_modules.cc_sqlalchemy import Base, get_table_ddl 

188from camcops_server.cc_modules.cc_summaryelement import ( 

189 ExtraSummaryTable, 

190 SummaryElement, 

191) 

192from camcops_server.cc_modules.cc_version import ( 

193 CAMCOPS_SERVER_VERSION, 

194 CAMCOPS_SERVER_VERSION_STRING, 

195 MINIMUM_TABLET_VERSION, 

196) 

197from camcops_server.cc_modules.cc_xml import ( 

198 get_xml_document, 

199 XML_COMMENT_ANCILLARY, 

200 XML_COMMENT_ANONYMOUS, 

201 XML_COMMENT_BLOBS, 

202 XML_COMMENT_CALCULATED, 

203 XML_COMMENT_PATIENT, 

204 XML_COMMENT_SNOMED_CT, 

205 XML_COMMENT_SPECIAL_NOTES, 

206 XML_NAME_SNOMED_CODES, 

207 XmlElement, 

208 XmlLiteral, 

209) 

210 

211if TYPE_CHECKING: 

212 from camcops_server.cc_modules.cc_ctvinfo import CtvInfo 

213 from camcops_server.cc_modules.cc_exportrecipient import ( 

214 ExportRecipient, 

215 ) 

216 from camcops_server.cc_modules.cc_patient import Patient 

217 from camcops_server.cc_modules.cc_patientidnum import ( 

218 PatientIdNum, 

219 ) 

220 from camcops_server.cc_modules.cc_request import ( 

221 CamcopsRequest, 

222 ) 

223 from camcops_server.cc_modules.cc_snomed import ( 

224 SnomedExpression, 

225 ) 

226 from camcops_server.cc_modules.cc_trackerhelpers import ( 

227 TrackerInfo, 

228 ) 

229 from camcops_server.cc_modules.cc_spreadsheet import ( 

230 SpreadsheetPage, 

231 ) 

232 

233log = BraceStyleAdapter(logging.getLogger(__name__)) 

234 

235 

236# ============================================================================= 

237# Debugging options 

238# ============================================================================= 

239 

240DEBUG_SKIP_FHIR_DOCS = False 

241DEBUG_SHOW_FHIR_QUESTIONNAIRE = False 

242 

243if any([DEBUG_SKIP_FHIR_DOCS, DEBUG_SHOW_FHIR_QUESTIONNAIRE]): 

244 log.warning("Debugging options enabled!") 

245 

246 

247# ============================================================================= 

248# Constants 

249# ============================================================================= 

250 

251ANCILLARY_FWD_REF = "Ancillary" 

252TASK_FWD_REF = "Task" 

253 

254FHIR_UNKNOWN_TEXT = "[?]" 

255 

256SNOMED_TABLENAME = "_snomed_ct" 

257SNOMED_COLNAME_TASKTABLE = "task_tablename" 

258SNOMED_COLNAME_TASKPK = "task_pk" 

259SNOMED_COLNAME_WHENCREATED_UTC = "when_created" 

260SNOMED_COLNAME_EXPRESSION = "snomed_expression" 

261UNUSED_SNOMED_XML_NAME = "snomed_ct_expressions" 

262 

263 

264# ============================================================================= 

265# Patient mixin 

266# ============================================================================= 

267 

268 

269class TaskHasPatientMixin(object): 

270 """ 

271 Mixin for tasks that have a patient (aren't anonymous). 

272 """ 

273 

274 # https://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/mixins.html#using-advanced-relationship-arguments-e-g-primaryjoin-etc # noqa 

275 

276 """ 

277 SQLAlchemy :class:`Column` that is a foreign key to the patient table. 

278 """ 

279 # noinspection PyMethodParameters 

280 patient_id: Mapped[int] = mapped_column( 

281 TFN_PATIENT_ID, 

282 index=True, 

283 comment="(TASK) Foreign key to patient.id (for this device/era)", 

284 ) 

285 

286 # noinspection PyMethodParameters 

287 @declared_attr 

288 def patient(cls) -> Mapped["Patient"]: 

289 """ 

290 SQLAlchemy relationship: "the patient for this task". 

291 

292 Note that this refers to the CURRENT version of the patient. If there 

293 is an editing chain, older patient versions are not retrieved. 

294 

295 Compare :func:`camcops_server.cc_modules.cc_blob.blob_relationship`, 

296 which uses the same strategy, as do several other similar functions. 

297 

298 """ 

299 return relationship( 

300 "Patient", 

301 primaryjoin=( 

302 "and_(" 

303 " remote(Patient.id) == foreign({task}.patient_id), " 

304 " remote(Patient._device_id) == foreign({task}._device_id), " 

305 " remote(Patient._era) == foreign({task}._era), " 

306 " remote(Patient._current) == True " 

307 ")".format(task=cls.__name__) # type: ignore[attr-defined] 

308 ), 

309 uselist=False, 

310 viewonly=True, 

311 # Profiling results 2019-10-14 exporting 4185 phq9 records with 

312 # unique patients to xlsx 

313 # lazy="select" : 59.7s 

314 # lazy="joined" : 44.3s 

315 # lazy="subquery": 36.9s 

316 # lazy="selectin": 35.3s 

317 # See also idnums relationship on Patient class (cc_patient.py) 

318 lazy="selectin", 

319 ) 

320 # NOTE: this retrieves the most recent (i.e. the current) information 

321 # on that patient. Consequently, task version history doesn't show the 

322 # history of patient edits. This is consistent with our relationship 

323 # strategy throughout for the web front-end viewer. 

324 

325 # noinspection PyMethodParameters 

326 @classproperty 

327 def has_patient(cls) -> bool: 

328 """ 

329 Does this task have a patient? (Yes.) 

330 """ 

331 return True 

332 

333 

334# ============================================================================= 

335# Clinician mixin 

336# ============================================================================= 

337 

338 

339class TaskHasClinicianMixin(object): 

340 """ 

341 Mixin to add clinician columns and override clinician-related methods. 

342 

343 Must be to the LEFT of ``Task`` in the class's base class list, i.e. 

344 must have higher precedence than ``Task`` in the method resolution order. 

345 """ 

346 

347 # noinspection PyMethodParameters 

348 clinician_specialty: Mapped[Optional[str]] = mapped_camcops_column( 

349 Text, 

350 exempt_from_anonymisation=True, 

351 comment="(CLINICIAN) Clinician's specialty " 

352 "(e.g. Liaison Psychiatry)", 

353 ) 

354 

355 # noinspection PyMethodParameters 

356 clinician_name: Mapped[Optional[str]] = mapped_camcops_column( 

357 Text, 

358 exempt_from_anonymisation=True, 

359 comment="(CLINICIAN) Clinician's name (e.g. Dr X)", 

360 ) 

361 

362 # noinspection PyMethodParameters 

363 clinician_professional_registration: Mapped[Optional[str]] = ( 

364 mapped_camcops_column( 

365 Text, 

366 exempt_from_anonymisation=True, 

367 comment="(CLINICIAN) Clinician's professional registration (e.g. " 

368 "GMC# 12345)", 

369 ) 

370 ) 

371 

372 # noinspection PyMethodParameters 

373 clinician_post: Mapped[Optional[str]] = mapped_camcops_column( 

374 Text, 

375 exempt_from_anonymisation=True, 

376 comment="(CLINICIAN) Clinician's post (e.g. Consultant)", 

377 ) 

378 

379 # noinspection PyMethodParameters 

380 clinician_service: Mapped[Optional[str]] = mapped_camcops_column( 

381 Text, 

382 exempt_from_anonymisation=True, 

383 comment="(CLINICIAN) Clinician's service (e.g. Liaison Psychiatry " 

384 "Service)", 

385 ) 

386 

387 # noinspection PyMethodParameters 

388 clinician_contact_details: Mapped[Optional[str]] = mapped_camcops_column( 

389 Text, 

390 exempt_from_anonymisation=True, 

391 comment="(CLINICIAN) Clinician's contact details (e.g. bleep, " 

392 "extension)", 

393 ) 

394 

395 # For field order, see also: 

396 # https://stackoverflow.com/questions/3923910/sqlalchemy-move-mixin-columns-to-end # noqa 

397 

398 # noinspection PyMethodParameters 

399 @classproperty 

400 def has_clinician(cls) -> bool: 

401 """ 

402 Does the task have a clinician? (Yes.) 

403 """ 

404 return True 

405 

406 def get_clinician_name(self) -> str: 

407 """ 

408 Returns the clinician's name. 

409 """ 

410 return self.clinician_name or "" 

411 

412 def get_clinician_fhir_telecom_other(self, req: "CamcopsRequest") -> str: 

413 """ 

414 Return a mishmash of information that doesn't fit neatly into a FHIR 

415 Practitioner object, but people might actually want to know. 

416 """ 

417 _ = req.gettext 

418 components = [] # type: List[str] 

419 # In sequence, e.g.: 

420 # - Consultant 

421 if self.clinician_post: 

422 components.append(f'{_("Post:")} {self.clinician_post}') 

423 # - Liaison Psychiatry 

424 if self.clinician_specialty: 

425 components.append(f'{_("Specialty:")} {self.clinician_specialty}') 

426 # - GMC# 12345 

427 if self.clinician_professional_registration: 

428 components.append( 

429 f'{_("Professional registration:")} ' 

430 f"{self.clinician_professional_registration}" 

431 ) 

432 # - Liaison Psychiatry Service 

433 if self.clinician_service: 

434 components.append(f'{_("Service:")} {self.clinician_service}') 

435 # - tel. x12345 

436 if self.clinician_contact_details: 

437 components.append( 

438 f'{_("Contact details:")} ' f"{self.clinician_contact_details}" 

439 ) 

440 return " | ".join(components) 

441 

442 

443# ============================================================================= 

444# Respondent mixin 

445# ============================================================================= 

446 

447 

448class TaskHasRespondentMixin(object): 

449 """ 

450 Mixin to add respondent columns and override respondent-related methods. 

451 

452 A respondent is someone who isn't the patient and isn't a clinician, such 

453 as a family member or carer. 

454 

455 Must be to the LEFT of ``Task`` in the class's base class list, i.e. 

456 must have higher precedence than ``Task`` in the method resolution order. 

457 

458 Notes: 

459 

460 - If you don't use ``@declared_attr``, the ``comment`` property on columns 

461 doesn't work. 

462 """ 

463 

464 # noinspection PyMethodParameters 

465 respondent_name: Mapped[Optional[str]] = camcops_column( # type: ignore[assignment] # noqa: E501 

466 TFN_RESPONDENT_NAME, 

467 Text, 

468 identifies_patient=True, 

469 comment="(RESPONDENT) Respondent's name", 

470 ) 

471 

472 # noinspection PyMethodParameters 

473 respondent_relationship: Mapped[Optional[str]] = mapped_column( 

474 TFN_RESPONDENT_RELATIONSHIP, 

475 Text, 

476 comment="(RESPONDENT) Respondent's relationship to patient", 

477 ) 

478 

479 # noinspection PyMethodParameters 

480 @classproperty 

481 def has_respondent(cls) -> bool: 

482 """ 

483 Does the class have a respondent? (Yes.) 

484 """ 

485 return True 

486 

487 def is_respondent_complete(self) -> bool: 

488 """ 

489 Do we have sufficient information about the respondent? 

490 (That means: name, relationship to the patient.) 

491 """ 

492 return all([self.respondent_name, self.respondent_relationship]) 

493 

494 

495# ============================================================================= 

496# Task base class 

497# ============================================================================= 

498 

499 

500class Task(GenericTabletRecordMixin, Base): 

501 """ 

502 Abstract base class for all tasks. 

503 

504 Note: 

505 

506 - For column definitions: use 

507 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column`, not 

508 :class:`Column`, if you have fields that need to define permitted values, 

509 mark them as BLOB-referencing fields, or do other CamCOPS-specific 

510 things. 

511 

512 """ 

513 

514 __abstract__ = True 

515 

516 # noinspection PyMethodParameters 

517 @declared_attr.directive 

518 def __mapper_args__(cls) -> dict[str, Any]: 

519 return {"polymorphic_identity": cls.__name__, "concrete": True} # type: ignore[attr-defined] # noqa: E501 

520 

521 # ========================================================================= 

522 # PART 0: COLUMNS COMMON TO ALL TASKS 

523 # ========================================================================= 

524 

525 # Columns 

526 

527 """ 

528 Column representing the task's creation time. 

529 """ 

530 # noinspection PyMethodParameters 

531 when_created: Mapped[Pendulum] = mapped_column( 

532 TFN_WHEN_CREATED, 

533 PendulumDateTimeAsIsoTextColType, 

534 comment="(TASK) Date/time this task instance was created " 

535 "(ISO 8601)", 

536 ) 

537 

538 """ 

539 Column representing when the user first exited the task's editor 

540 (i.e. first "finish" or first "abort"). 

541 """ 

542 # noinspection PyMethodParameters 

543 when_firstexit: Mapped[Optional[Pendulum]] = mapped_column( 

544 TFN_WHEN_FIRSTEXIT, 

545 PendulumDateTimeAsIsoTextColType, 

546 comment="(TASK) Date/time of the first exit from this task (ISO 8601)", 

547 ) 

548 

549 """ 

550 Was the first exit from the task's editor a successful "finish"? 

551 """ 

552 # noinspection PyMethodParameters 

553 firstexit_is_finish: Mapped[Optional[bool]] = mapped_column( 

554 TFN_FIRSTEXIT_IS_FINISH, 

555 comment="(TASK) Was the first exit from the task because it was " 

556 "finished (1)?", 

557 ) 

558 

559 """ 

560 Was the first exit from the task's editor an "abort"? 

561 """ 

562 # noinspection PyMethodParameters 

563 firstexit_is_abort: Mapped[Optional[bool]] = mapped_column( 

564 TFN_FIRSTEXIT_IS_ABORT, 

565 comment="(TASK) Was the first exit from this task because it was " 

566 "aborted (1)?", 

567 ) 

568 

569 """ 

570 How long has the user spent editing the task? 

571 (Calculated by the CamCOPS client.) 

572 """ 

573 # noinspection PyMethodParameters 

574 editing_time_: Mapped[Optional[float]] = mapped_column( 

575 TFN_EDITING_TIME_S, comment="(TASK) Time spent editing (s)" 

576 ) 

577 

578 # Relationships 

579 

580 # noinspection PyMethodParameters 

581 @declared_attr 

582 def special_notes(cls) -> Mapped[List[SpecialNote]]: 

583 """ 

584 List-style SQLAlchemy relationship to any :class:`SpecialNote` objects 

585 attached to this class. Skips hidden (quasi-deleted) notes. 

586 """ 

587 return relationship( 

588 SpecialNote, 

589 primaryjoin=( 

590 "and_(" 

591 " remote(SpecialNote.basetable) == literal({repr_task_tablename}), " # noqa 

592 " remote(SpecialNote.task_id) == foreign({task}.id), " 

593 " remote(SpecialNote.device_id) == foreign({task}._device_id), " # noqa 

594 " remote(SpecialNote.era) == foreign({task}._era), " 

595 " not_(SpecialNote.hidden)" 

596 ")".format( 

597 task=cls.__name__, # type: ignore[attr-defined] 

598 repr_task_tablename=repr(cls.__tablename__), 

599 ) 

600 ), 

601 uselist=True, 

602 order_by="SpecialNote.note_at", 

603 viewonly=True, # for now! 

604 ) 

605 

606 # ========================================================================= 

607 # PART 1: THINGS THAT DERIVED CLASSES MAY CARE ABOUT 

608 # ========================================================================= 

609 # 

610 # Notes: 

611 # 

612 # - for summaries, see GenericTabletRecordMixin.get_summaries 

613 

614 # ------------------------------------------------------------------------- 

615 # Attributes that must be provided 

616 # ------------------------------------------------------------------------- 

617 __tablename__ = None # type: str # also the SQLAlchemy table name 

618 shortname = None # type: str 

619 

620 # ------------------------------------------------------------------------- 

621 # Attributes that can be overridden 

622 # ------------------------------------------------------------------------- 

623 extrastring_taskname = ( 

624 None 

625 ) # type: str # if None, tablename is used instead 

626 info_filename_stem = ( 

627 None 

628 ) # type: str # if None, tablename is used instead 

629 provides_trackers = False 

630 use_landscape_for_pdf = False 

631 dependent_classes = [] # type: ignore[var-annotated] 

632 

633 prohibits_clinical = False 

634 prohibits_commercial = False 

635 prohibits_educational = False 

636 prohibits_research = False 

637 

638 @classmethod 

639 def prohibits_anything(cls) -> bool: 

640 return any( 

641 [ 

642 cls.prohibits_clinical, 

643 cls.prohibits_commercial, 

644 cls.prohibits_educational, 

645 cls.prohibits_research, 

646 ] 

647 ) 

648 

649 # ------------------------------------------------------------------------- 

650 # Methods always overridden by the actual task 

651 # ------------------------------------------------------------------------- 

652 

653 @staticmethod 

654 def longname(req: "CamcopsRequest") -> str: 

655 """ 

656 Long name (in the relevant language). 

657 """ 

658 raise NotImplementedError("Task.longname must be overridden") 

659 

660 def is_complete(self) -> bool: 

661 """ 

662 Is the task instance complete? 

663 

664 Must be overridden. 

665 """ 

666 raise NotImplementedError("Task.is_complete must be overridden") 

667 

668 def get_task_html(self, req: "CamcopsRequest") -> str: 

669 """ 

670 HTML for the main task content. 

671 

672 Must be overridden by derived classes. 

673 """ 

674 raise NotImplementedError( 

675 "No get_task_html() HTML generator for this task class!" 

676 ) 

677 

678 # ------------------------------------------------------------------------- 

679 # Implement if you provide trackers 

680 # ------------------------------------------------------------------------- 

681 

682 def get_trackers(self, req: "CamcopsRequest") -> List["TrackerInfo"]: 

683 """ 

684 Tasks that provide quantitative information for tracking over time 

685 should override this and return a list of 

686 :class:`camcops_server.cc_modules.cc_trackerhelpers.TrackerInfo` 

687 objects, one per tracker. 

688 

689 The information is read by 

690 :meth:`camcops_server.cc_modules.cc_tracker.Tracker.get_all_plots_for_one_task_html`. 

691 

692 Time information will be retrieved using :func:`get_creation_datetime`. 

693 """ 

694 return [] 

695 

696 # ------------------------------------------------------------------------- 

697 # Override to provide clinical text 

698 # ------------------------------------------------------------------------- 

699 

700 # noinspection PyMethodMayBeStatic 

701 def get_clinical_text( 

702 self, req: "CamcopsRequest" 

703 ) -> Optional[List["CtvInfo"]]: 

704 """ 

705 Tasks that provide clinical text information should override this 

706 to provide a list of 

707 :class:`camcops_server.cc_modules.cc_ctvinfo.CtvInfo` objects. 

708 

709 Return ``None`` (default) for a task that doesn't provide clinical 

710 text, or ``[]`` for one that does in general but has no information for 

711 this particular instance, or a list of 

712 :class:`camcops_server.cc_modules.cc_ctvinfo.CtvInfo` objects. 

713 """ 

714 return None 

715 

716 # ------------------------------------------------------------------------- 

717 # Override some of these if you provide summaries 

718 # ------------------------------------------------------------------------- 

719 

720 # noinspection PyMethodMayBeStatic,PyUnusedLocal 

721 def get_extra_summary_tables( 

722 self, req: "CamcopsRequest" 

723 ) -> List[ExtraSummaryTable]: 

724 """ 

725 Override if you wish to create extra summary tables, not just add 

726 summary columns to task/ancillary tables. 

727 

728 Return a list of 

729 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable` 

730 objects. 

731 """ 

732 return [] 

733 

734 # ------------------------------------------------------------------------- 

735 # Implement if you provide SNOMED-CT codes 

736 # ------------------------------------------------------------------------- 

737 

738 # noinspection PyMethodMayBeStatic,PyUnusedLocal 

739 def get_snomed_codes( 

740 self, req: "CamcopsRequest" 

741 ) -> List["SnomedExpression"]: 

742 """ 

743 Returns all SNOMED-CT codes for this task. 

744 

745 Args: 

746 req: the 

747 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

748 

749 Returns: 

750 a list of 

751 :class:`camcops_server.cc_modules.cc_snomed.SnomedExpression` 

752 objects 

753 

754 """ 

755 return [] 

756 

757 # ========================================================================= 

758 # PART 2: INTERNALS 

759 # ========================================================================= 

760 

761 # ------------------------------------------------------------------------- 

762 # Representations 

763 # ------------------------------------------------------------------------- 

764 

765 def __str__(self) -> str: 

766 if self.is_anonymous: 

767 patient_str = "" 

768 else: 

769 patient_str = f", patient={self.patient}" 

770 return "{t} (_pk={pk}, when_created={wc}{patient})".format( 

771 t=self.tablename, 

772 pk=self.pk, 

773 wc=( 

774 format_datetime(self.when_created, DateFormat.ERA) 

775 if self.when_created 

776 else "None" 

777 ), 

778 patient=patient_str, 

779 ) 

780 

781 def __repr__(self) -> str: 

782 return "<{classname}(_pk={pk}, when_created={wc})>".format( 

783 classname=self.__class__.__qualname__, 

784 pk=self.pk, 

785 wc=( 

786 format_datetime(self.when_created, DateFormat.ERA) 

787 if self.when_created 

788 else "None" 

789 ), 

790 ) 

791 

792 # ------------------------------------------------------------------------- 

793 # Way to fetch all task types 

794 # ------------------------------------------------------------------------- 

795 

796 @classmethod 

797 def gen_all_subclasses(cls) -> Generator[Type[TASK_FWD_REF], None, None]: # type: ignore[valid-type] # noqa: E501 

798 """ 

799 Generate all non-abstract SQLAlchemy ORM subclasses of :class:`Task` -- 

800 that is, all task classes. 

801 

802 We require that actual tasks are subclasses of both :class:`Task` and 

803 :class:`camcops_server.cc_modules.cc_sqlalchemy.Base`. 

804 

805 OLD WAY (ignore): this means we can (a) inherit from Task to make an 

806 abstract base class for actual tasks, as with PCL, HADS, HoNOS, etc.; 

807 and (b) not have those intermediate classes appear in the task list. 

808 Since all actual classes must be SQLAlchemy ORM objects inheriting from 

809 Base, that common inheritance is an excellent way to define them. 

810 

811 NEW WAY: things now inherit from Base/Task without necessarily 

812 being actual tasks; we discriminate using ``__abstract__`` and/or 

813 ``__tablename__``. See 

814 https://docs.sqlalchemy.org/en/latest/orm/inheritance.html#abstract-concrete-classes 

815 """ 

816 # noinspection PyTypeChecker 

817 return gen_orm_classes_from_base(cls) 

818 

819 @classmethod 

820 @cache_region_static.cache_on_arguments(function_key_generator=fkg) 

821 def all_subclasses_by_tablename(cls) -> List[Type[TASK_FWD_REF]]: # type: ignore[valid-type] # noqa: E501 

822 """ 

823 Return all task classes, ordered by table name. 

824 """ 

825 classes = list(cls.gen_all_subclasses()) 

826 classes.sort(key=lambda c: c.tablename) # type: ignore[attr-defined] 

827 return classes 

828 

829 @classmethod 

830 @cache_region_static.cache_on_arguments(function_key_generator=fkg) 

831 def all_subclasses_by_shortname(cls) -> List[Type[TASK_FWD_REF]]: # type: ignore[valid-type] # noqa: E501 

832 """ 

833 Return all task classes, ordered by short name. 

834 """ 

835 classes = list(cls.gen_all_subclasses()) 

836 classes.sort(key=lambda c: c.shortname) # type: ignore[attr-defined] 

837 return classes 

838 

839 @classmethod 

840 def all_subclasses_by_longname( 

841 cls, req: "CamcopsRequest" 

842 ) -> List[Type[TASK_FWD_REF]]: # type: ignore[valid-type] 

843 """ 

844 Return all task classes, ordered by long name. 

845 """ 

846 classes = cls.all_subclasses_by_shortname() 

847 classes.sort(key=lambda c: c.longname(req)) 

848 return classes 

849 

850 # ------------------------------------------------------------------------- 

851 # Methods that may be overridden by mixins 

852 # ------------------------------------------------------------------------- 

853 

854 # noinspection PyMethodParameters 

855 @classproperty 

856 def has_patient(cls) -> bool: 

857 """ 

858 Does the task have a patient? (No.) 

859 

860 May be overridden by :class:`TaskHasPatientMixin`. 

861 """ 

862 return False 

863 

864 # noinspection PyMethodParameters 

865 @classproperty 

866 def is_anonymous(cls) -> bool: 

867 """ 

868 Antonym for :attr:`has_patient`. 

869 """ 

870 return not cls.has_patient 

871 

872 # noinspection PyMethodParameters 

873 @classproperty 

874 def has_clinician(cls) -> bool: 

875 """ 

876 Does the task have a clinician? (No.) 

877 

878 May be overridden by :class:`TaskHasClinicianMixin`. 

879 """ 

880 return False 

881 

882 # noinspection PyMethodParameters 

883 @classproperty 

884 def has_respondent(cls) -> bool: 

885 """ 

886 Does the task have a respondent? (No.) 

887 

888 May be overridden by :class:`TaskHasRespondentMixin`. 

889 """ 

890 return False 

891 

892 # ------------------------------------------------------------------------- 

893 # Other classmethods 

894 # ------------------------------------------------------------------------- 

895 

896 # noinspection PyMethodParameters 

897 @classproperty 

898 def tablename(cls) -> str: 

899 """ 

900 Returns the database table name for the task's primary table. 

901 """ 

902 return cls.__tablename__ 

903 

904 # noinspection PyMethodParameters 

905 @classproperty 

906 def minimum_client_version(cls) -> Version: 

907 """ 

908 Returns the minimum client version that provides this task. 

909 

910 Override this as you add tasks. 

911 

912 Used by 

913 :func:`camcops_server.cc_modules.client_api.ensure_valid_table_name`. 

914 

915 (There are some pre-C++ client versions for which the default is not 

916 exactly accurate, and the tasks do not override, but this is of no 

917 consequence and the version numbering system also changed, from 

918 something legible as a float -- e.g. ``1.2 > 1.14`` -- to something 

919 interpreted as a semantic version -- e.g. ``1.2 < 1.14``. So we ignore 

920 that.) 

921 """ 

922 return MINIMUM_TABLET_VERSION 

923 

924 # noinspection PyMethodParameters 

925 @classmethod 

926 def all_tables_with_min_client_version(cls) -> Dict[str, Version]: 

927 """ 

928 Returns a dictionary mapping all this task's table names (primary and 

929 ancillary) to the corresponding minimum client version. 

930 """ 

931 v = cls.minimum_client_version 

932 d = {cls.__tablename__: v} # type: Dict[str, Version] 

933 for _, _, rel_cls in gen_ancillary_relationships(cls): 

934 d[rel_cls.__tablename__] = v 

935 return d 

936 

937 @classmethod 

938 def all_tables(cls) -> List[Table]: 

939 """ 

940 Returns all table classes (primary table plus any ancillary tables). 

941 """ 

942 # noinspection PyUnresolvedReferences 

943 return [cls.__table__] + [ # type: ignore[return-value] 

944 rel_cls.__table__ # type: ignore[attr-defined] 

945 for _, _, rel_cls in gen_ancillary_relationships(cls) 

946 ] 

947 

948 @classmethod 

949 def get_ddl(cls, dialect_name: str = SqlaDialectName.MYSQL) -> str: 

950 """ 

951 Returns DDL for the primary and any ancillary tables. 

952 """ 

953 return "\n\n".join( 

954 get_table_ddl(t, dialect_name).strip() for t in cls.all_tables() 

955 ) 

956 

957 @classmethod 

958 def help_url(cls) -> str: 

959 """ 

960 Returns the URL for task-specific online help. 

961 

962 By default, this is based on the tablename -- e.g. ``phq9``, giving 

963 ``phq9.html`` in the documentation (from ``phq9.rst`` in the source). 

964 However, some tasks override this -- which they may do by writing 

965 

966 .. code-block:: python 

967 

968 info_filename_stem = "XXX" 

969 

970 In the C++ code, compare infoFilenameStem() for individual tasks and 

971 urlconst::taskDocUrl() overall. 

972 

973 The online help is presently only in English. 

974 """ 

975 basename = cls.help_url_basename() 

976 language = "en" 

977 # DOCUMENTATION_URL has a trailing slash already 

978 return f"{DOCUMENTATION_URL}{language}/latest/tasks/{basename}.html" 

979 

980 @classmethod 

981 def help_url_basename(cls) -> str: 

982 return cls.info_filename_stem or cls.tablename 

983 

984 # ------------------------------------------------------------------------- 

985 # More on fields 

986 # ------------------------------------------------------------------------- 

987 

988 @classmethod 

989 def get_fieldnames(cls) -> List[str]: 

990 """ 

991 Returns all field (column) names for this task's primary table. 

992 """ 

993 return get_column_attr_names(cls) 

994 

995 def field_contents_valid(self) -> bool: 

996 """ 

997 Checks field contents validity. 

998 

999 This is a high-speed function that doesn't bother with explanations, 

1000 since we use it for lots of task :func:`is_complete` calculations. 

1001 """ 

1002 return permitted_values_ok(self) 

1003 

1004 def field_contents_invalid_because(self) -> List[str]: 

1005 """ 

1006 Explains why contents are invalid. 

1007 """ 

1008 return permitted_value_failure_msgs(self) 

1009 

1010 def get_blob_fields(self) -> List[str]: 

1011 """ 

1012 Returns field (column) names for all BLOB fields in this class. 

1013 """ 

1014 return get_camcops_blob_column_attr_names(self) 

1015 

1016 # ------------------------------------------------------------------------- 

1017 # Server field calculations 

1018 # ------------------------------------------------------------------------- 

1019 

1020 def is_preserved(self) -> bool: 

1021 """ 

1022 Is the task preserved and erased from the tablet? 

1023 """ 

1024 return self._pk is not None and self._era != ERA_NOW 

1025 

1026 def was_forcibly_preserved(self) -> bool: 

1027 """ 

1028 Was this task forcibly preserved? 

1029 """ 

1030 return self._forcibly_preserved and self.is_preserved() 

1031 

1032 def get_creation_datetime(self) -> Optional[Pendulum]: 

1033 """ 

1034 Creation datetime, or None. 

1035 """ 

1036 return self.when_created 

1037 

1038 def get_creation_datetime_utc(self) -> Optional[Pendulum]: 

1039 """ 

1040 Creation datetime in UTC, or None. 

1041 """ 

1042 localtime = self.get_creation_datetime() 

1043 if localtime is None: 

1044 return None 

1045 return convert_datetime_to_utc(localtime) 

1046 

1047 def get_creation_datetime_utc_tz_unaware( 

1048 self, 

1049 ) -> Optional[datetime.datetime]: 

1050 """ 

1051 Creation time as a :class:`datetime.datetime` object on UTC with no 

1052 timezone (i.e. an "offset-naive" datetime), or None. 

1053 """ 

1054 localtime = self.get_creation_datetime() 

1055 if localtime is None: 

1056 return None 

1057 return pendulum_to_utc_datetime_without_tz(localtime) 

1058 

1059 def get_seconds_from_creation_to_first_finish(self) -> Optional[float]: 

1060 """ 

1061 Time in seconds from creation time to first finish (i.e. first exit 

1062 if the first exit was a finish rather than an abort), or None. 

1063 """ 

1064 if not self.firstexit_is_finish: 

1065 return None 

1066 start = self.get_creation_datetime() 

1067 end = self.when_firstexit 

1068 if not start or not end: 

1069 return None 

1070 diff = end - start 

1071 return diff.total_seconds() 

1072 

1073 def get_adding_user_id(self) -> Optional[int]: 

1074 """ 

1075 Returns the user ID of the user who uploaded this task. 

1076 """ 

1077 # noinspection PyTypeChecker 

1078 return self._adding_user_id 

1079 

1080 def get_adding_user_username(self) -> str: 

1081 """ 

1082 Returns the username of the user who uploaded this task. 

1083 """ 

1084 return self._adding_user.username if self._adding_user else "" 

1085 

1086 def get_removing_user_username(self) -> str: 

1087 """ 

1088 Returns the username of the user who deleted this task (by removing it 

1089 on the client and re-uploading). 

1090 """ 

1091 return self._removing_user.username if self._removing_user else "" 

1092 

1093 def get_preserving_user_username(self) -> str: 

1094 """ 

1095 Returns the username of the user who "preserved" this task (marking it 

1096 to be saved on the server and then deleting it from the client). 

1097 """ 

1098 return self._preserving_user.username if self._preserving_user else "" 

1099 

1100 def get_manually_erasing_user_username(self) -> str: 

1101 """ 

1102 Returns the username of the user who erased this task manually on the 

1103 server. 

1104 """ 

1105 return ( 

1106 self._manually_erasing_user.username 

1107 if self._manually_erasing_user 

1108 else "" 

1109 ) 

1110 

1111 # ------------------------------------------------------------------------- 

1112 # Summary tables 

1113 # ------------------------------------------------------------------------- 

1114 

1115 def standard_task_summary_fields(self) -> List[SummaryElement]: 

1116 """ 

1117 Returns summary fields/values provided by all tasks. 

1118 """ 

1119 return [ 

1120 SummaryElement( 

1121 name=SFN_IS_COMPLETE, 

1122 coltype=Boolean(), 

1123 value=self.is_complete(), 

1124 comment="(GENERIC) Task complete?", 

1125 ), 

1126 SummaryElement( 

1127 name=SFN_SECONDS_CREATION_TO_FIRST_FINISH, 

1128 coltype=Float(), 

1129 value=self.get_seconds_from_creation_to_first_finish(), 

1130 comment="(GENERIC) Time (in seconds) from record creation to " 

1131 "first exit, if that was a finish not an abort", 

1132 ), 

1133 SummaryElement( 

1134 name=SFN_CAMCOPS_SERVER_VERSION, 

1135 coltype=SemanticVersionColType(), 

1136 value=CAMCOPS_SERVER_VERSION, 

1137 comment="(GENERIC) CamCOPS server version that created the " 

1138 "summary information", 

1139 ), 

1140 ] 

1141 

1142 def get_all_summary_tables( 

1143 self, req: "CamcopsRequest" 

1144 ) -> List[ExtraSummaryTable]: 

1145 """ 

1146 Returns all 

1147 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable` 

1148 objects for this class, including any provided by subclasses, plus 

1149 SNOMED CT codes if enabled. 

1150 """ 

1151 tables = self.get_extra_summary_tables(req) 

1152 if req.snomed_supported: 

1153 tables.append(self._get_snomed_extra_summary_table(req)) 

1154 return tables 

1155 

1156 def _get_snomed_extra_summary_table( 

1157 self, req: "CamcopsRequest" 

1158 ) -> ExtraSummaryTable: 

1159 """ 

1160 Returns a 

1161 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable` 

1162 for this task's SNOMED CT codes. 

1163 """ 

1164 codes = self.get_snomed_codes(req) 

1165 columns = [ 

1166 Column( 

1167 SNOMED_COLNAME_TASKTABLE, 

1168 TableNameColType, 

1169 comment="Task's base table name", 

1170 ), 

1171 Column( 

1172 SNOMED_COLNAME_TASKPK, 

1173 Integer, 

1174 comment="Task's server primary key", 

1175 ), 

1176 Column( 

1177 SNOMED_COLNAME_WHENCREATED_UTC, 

1178 DateTime, 

1179 comment="Task's creation date/time (UTC)", 

1180 ), 

1181 camcops_column( 

1182 SNOMED_COLNAME_EXPRESSION, 

1183 Text, 

1184 exempt_from_anonymisation=True, 

1185 comment="SNOMED CT expression", 

1186 ), 

1187 ] 

1188 rows = [] # type: List[Dict[str, Any]] 

1189 for code in codes: 

1190 d = OrderedDict( 

1191 [ 

1192 (SNOMED_COLNAME_TASKTABLE, self.tablename), 

1193 (SNOMED_COLNAME_TASKPK, self.pk), 

1194 ( 

1195 SNOMED_COLNAME_WHENCREATED_UTC, 

1196 self.get_creation_datetime_utc_tz_unaware(), 

1197 ), 

1198 (SNOMED_COLNAME_EXPRESSION, code.as_string()), 

1199 ] 

1200 ) 

1201 rows.append(d) 

1202 return ExtraSummaryTable( 

1203 tablename=SNOMED_TABLENAME, 

1204 xmlname=UNUSED_SNOMED_XML_NAME, # though actual XML doesn't use this route # noqa 

1205 columns=columns, # type: ignore[arg-type] 

1206 rows=rows, 

1207 task=self, 

1208 ) 

1209 

1210 # ------------------------------------------------------------------------- 

1211 # Testing 

1212 # ------------------------------------------------------------------------- 

1213 

1214 def dump(self) -> None: 

1215 """ 

1216 Dump a description of the task instance to the Python log, for 

1217 debugging. 

1218 """ 

1219 line_equals = "=" * 79 

1220 lines = ["", line_equals] 

1221 for f in self.get_fieldnames(): 

1222 lines.append(f"{f}: {getattr(self, f)!r}") 

1223 lines.append(line_equals) 

1224 log.info("\n".join(lines)) 

1225 

1226 # ------------------------------------------------------------------------- 

1227 # Special notes 

1228 # ------------------------------------------------------------------------- 

1229 

1230 def apply_special_note( 

1231 self, req: "CamcopsRequest", note: str, from_console: bool = False 

1232 ) -> None: 

1233 """ 

1234 Manually applies a special note to a task. 

1235 

1236 Applies it to all predecessor/successor versions as well. 

1237 WRITES TO THE DATABASE. 

1238 """ 

1239 sn = SpecialNote() 

1240 sn.basetable = self.tablename 

1241 sn.task_id = self.id 

1242 sn.device_id = self._device_id 

1243 sn.era = self._era 

1244 sn.note_at = req.now 

1245 sn.user_id = req.user_id 

1246 sn.note = note 

1247 dbsession = req.dbsession 

1248 dbsession.add(sn) 

1249 self.audit(req, "Special note applied manually", from_console) 

1250 self.cancel_from_export_log(req, from_console) 

1251 

1252 # ------------------------------------------------------------------------- 

1253 # Clinician 

1254 # ------------------------------------------------------------------------- 

1255 

1256 # noinspection PyMethodMayBeStatic 

1257 def get_clinician_name(self) -> str: 

1258 """ 

1259 May be overridden by :class:`TaskHasClinicianMixin`; q.v. 

1260 """ 

1261 return "" 

1262 

1263 # noinspection PyMethodMayBeStatic,PyUnusedLocal 

1264 def get_clinician_fhir_telecom_other(self, req: "CamcopsRequest") -> str: 

1265 """ 

1266 May be overridden by :class:`TaskHasClinicianMixin`; q.v. 

1267 """ 

1268 return "" 

1269 

1270 # ------------------------------------------------------------------------- 

1271 # Respondent 

1272 # ------------------------------------------------------------------------- 

1273 

1274 # noinspection PyMethodMayBeStatic 

1275 def is_respondent_complete(self) -> bool: 

1276 """ 

1277 Is the respondent information complete? 

1278 

1279 May be overridden by :class:`TaskHasRespondentMixin`. 

1280 """ 

1281 return False 

1282 

1283 # ------------------------------------------------------------------------- 

1284 # About the associated patient 

1285 # ------------------------------------------------------------------------- 

1286 

1287 @property 

1288 def patient(self) -> Optional["Patient"]: 

1289 """ 

1290 Returns the :class:`camcops_server.cc_modules.cc_patient.Patient` for 

1291 this task. 

1292 

1293 Overridden by :class:`TaskHasPatientMixin`. 

1294 """ 

1295 return None 

1296 

1297 def is_female(self) -> bool: 

1298 """ 

1299 Is the patient female? 

1300 """ 

1301 return self.patient.is_female() if self.patient else False 

1302 

1303 def is_male(self) -> bool: 

1304 """ 

1305 Is the patient male? 

1306 """ 

1307 return self.patient.is_male() if self.patient else False 

1308 

1309 def get_patient_server_pk(self) -> Optional[int]: 

1310 """ 

1311 Get the server PK of the patient, or None. 

1312 """ 

1313 return self.patient.pk if self.patient else None 

1314 

1315 def get_patient_forename(self) -> str: 

1316 """ 

1317 Get the patient's forename, in upper case, or "". 

1318 """ 

1319 return self.patient.get_forename() if self.patient else "" 

1320 

1321 def get_patient_surname(self) -> str: 

1322 """ 

1323 Get the patient's surname, in upper case, or "". 

1324 """ 

1325 return self.patient.get_surname() if self.patient else "" 

1326 

1327 def get_patient_dob(self) -> Optional[PendulumDate]: 

1328 """ 

1329 Get the patient's DOB, or None. 

1330 """ 

1331 return self.patient.get_dob() if self.patient else None 

1332 

1333 def get_patient_dob_first11chars(self) -> Optional[str]: 

1334 """ 

1335 Gets the patient's date of birth in an 11-character human-readable 

1336 short format. For example: ``29 Dec 1999``. 

1337 """ 

1338 if not self.patient: 

1339 return None 

1340 dob_str = self.patient.get_dob_str() 

1341 if not dob_str: 

1342 return None 

1343 return dob_str[:11] 

1344 

1345 def get_patient_sex(self) -> str: 

1346 """ 

1347 Get the patient's sex, or "". 

1348 """ 

1349 return self.patient.get_sex() if self.patient else "" 

1350 

1351 def get_patient_address(self) -> str: 

1352 """ 

1353 Get the patient's address, or "". 

1354 """ 

1355 return self.patient.get_address() if self.patient else "" 

1356 

1357 def get_patient_idnum_objects(self) -> List["PatientIdNum"]: 

1358 """ 

1359 Gets all 

1360 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum` objects 

1361 for the patient. 

1362 """ 

1363 return self.patient.get_idnum_objects() if self.patient else [] 

1364 

1365 def get_patient_idnum_object( 

1366 self, which_idnum: int 

1367 ) -> Optional["PatientIdNum"]: 

1368 """ 

1369 Get the patient's 

1370 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum` for the 

1371 specified ID number type (``which_idnum``), or None. 

1372 """ 

1373 return ( 

1374 self.patient.get_idnum_object(which_idnum) 

1375 if self.patient 

1376 else None 

1377 ) 

1378 

1379 def any_patient_idnums_invalid(self, req: "CamcopsRequest") -> bool: 

1380 """ 

1381 Do we have a patient who has any invalid ID numbers? 

1382 

1383 Args: 

1384 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1385 """ 

1386 idnums = self.get_patient_idnum_objects() 

1387 for idnum in idnums: 

1388 if not idnum.is_fully_valid(req): 

1389 return True 

1390 return False 

1391 

1392 def get_patient_idnum_value(self, which_idnum: int) -> Optional[int]: 

1393 """ 

1394 Get the patient's ID number value for the specified ID number 

1395 type (``which_idnum``), or None. 

1396 """ 

1397 idobj = self.get_patient_idnum_object(which_idnum=which_idnum) 

1398 return idobj.idnum_value if idobj else None 

1399 

1400 def get_patient_hl7_pid_segment( 

1401 self, req: "CamcopsRequest", recipient_def: "ExportRecipient" 

1402 ) -> Union[hl7.Segment, str]: 

1403 """ 

1404 Get an HL7 PID segment for the patient, or "". 

1405 """ 

1406 return ( 

1407 self.patient.get_hl7_pid_segment(req, recipient_def) 

1408 if self.patient 

1409 else "" 

1410 ) 

1411 

1412 # ------------------------------------------------------------------------- 

1413 # HL7 v2 

1414 # ------------------------------------------------------------------------- 

1415 

1416 def get_hl7_data_segments( 

1417 self, req: "CamcopsRequest", recipient_def: "ExportRecipient" 

1418 ) -> List[hl7.Segment]: 

1419 """ 

1420 Returns a list of HL7 data segments. 

1421 

1422 These will be: 

1423 

1424 - observation request (OBR) segment 

1425 - observation result (OBX) segment 

1426 - any extra ones offered by the task 

1427 """ 

1428 obr_segment = make_obr_segment(self) 

1429 export_options = recipient_def.get_task_export_options() 

1430 obx_segment = make_obx_segment( 

1431 req, 

1432 self, 

1433 task_format=recipient_def.task_format, 

1434 observation_identifier=self.tablename + "_" + str(self._pk), 

1435 observation_datetime=self.get_creation_datetime(), 

1436 responsible_observer=self.get_clinician_name(), 

1437 export_options=export_options, 

1438 ) 

1439 return [obr_segment, obx_segment] + self.get_hl7_extra_data_segments( 

1440 recipient_def 

1441 ) 

1442 

1443 # noinspection PyMethodMayBeStatic,PyUnusedLocal 

1444 def get_hl7_extra_data_segments( 

1445 self, recipient_def: "ExportRecipient" 

1446 ) -> List[hl7.Segment]: 

1447 """ 

1448 Return a list of any extra HL7 data segments. (See 

1449 :func:`get_hl7_data_segments`, which calls this function.) 

1450 

1451 May be overridden. 

1452 """ 

1453 return [] 

1454 

1455 # ------------------------------------------------------------------------- 

1456 # FHIR: framework 

1457 # ------------------------------------------------------------------------- 

1458 

1459 def get_fhir_bundle( 

1460 self, 

1461 req: "CamcopsRequest", 

1462 recipient: "ExportRecipient", 

1463 skip_docs_if_other_content: bool = DEBUG_SKIP_FHIR_DOCS, 

1464 ) -> Bundle: 

1465 """ 

1466 Get a single FHIR Bundle with all entries. See 

1467 :meth:`get_fhir_bundle_entries`. 

1468 """ 

1469 # Get the content: 

1470 bundle_entries = self.get_fhir_bundle_entries( 

1471 req, 

1472 recipient, 

1473 skip_docs_if_other_content=skip_docs_if_other_content, 

1474 ) 

1475 # ... may raise FhirExportException 

1476 

1477 # Sanity checks: 

1478 id_counter = Counter() # type: ignore[var-annotated] 

1479 for entry in bundle_entries: 

1480 assert ( 

1481 Fc.RESOURCE in entry 

1482 ), f"Bundle entry has no resource: {entry}" # just wrong 

1483 resource = entry[Fc.RESOURCE] 

1484 assert Fc.IDENTIFIER in resource, ( 

1485 f"Bundle entry has no identifier for its resource: " 

1486 f"{resource}" 

1487 ) # might succeed, but would insert an unidentified resource 

1488 identifier = resource[Fc.IDENTIFIER] 

1489 if not isinstance(identifier, list): 

1490 identifier = [identifier] 

1491 for id_ in identifier: 

1492 system = id_[Fc.SYSTEM] 

1493 value = id_[Fc.VALUE] 

1494 id_counter.update([fhir_system_value(system, value)]) 

1495 most_common = id_counter.most_common(1)[0] 

1496 assert ( 

1497 most_common[1] == 1 

1498 ), f"Resources have duplicate IDs: {most_common[0]}" 

1499 

1500 # Bundle up the content into a transaction bundle: 

1501 return Bundle( 

1502 jsondict={Fc.TYPE: Fc.TRANSACTION, Fc.ENTRY: bundle_entries} 

1503 ) 

1504 # This is one of the few FHIR objects that we don't return with 

1505 # ".as_json()", because Bundle objects have useful methods for talking 

1506 # to the FHIR server. 

1507 

1508 def get_fhir_bundle_entries( 

1509 self, 

1510 req: "CamcopsRequest", 

1511 recipient: "ExportRecipient", 

1512 skip_docs_if_other_content: bool = DEBUG_SKIP_FHIR_DOCS, 

1513 ) -> List[Dict]: 

1514 """ 

1515 Get all FHIR bundle entries. This is the "top-level" function to 

1516 provide all FHIR information for the task. That information includes: 

1517 

1518 - the Patient, if applicable; 

1519 - the Questionnaire (task) itself; 

1520 - multiple QuestionnaireResponse entries for the specific answers from 

1521 this task instance. 

1522 

1523 If the task refuses to support FHIR, raises :exc:`FhirExportException`. 

1524 

1525 Args: 

1526 req: 

1527 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1528 recipient: 

1529 an 

1530 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

1531 skip_docs_if_other_content: 

1532 A debugging option: skip the document (e.g. PDF, HTML, XML), 

1533 making the FHIR output smaller and more legible for debugging. 

1534 However, if the task offers no other content, this will raise 

1535 :exc:`FhirExportException`. 

1536 """ 

1537 bundle_entries = [] # type: List[Dict] 

1538 

1539 # Patient (0 or 1) 

1540 if self.has_patient: 

1541 bundle_entries.append( 

1542 self.patient.get_fhir_bundle_entry(req, recipient) 

1543 ) 

1544 

1545 # Clinician (0 or 1) 

1546 if self.has_clinician: 

1547 bundle_entries.append(self._get_fhir_clinician_bundle_entry(req)) 

1548 

1549 # Questionnaire, QuestionnaireResponse 

1550 q_bundle_entry, qr_bundle_entry = self._get_fhir_q_qr_bundle_entries( 

1551 req, recipient 

1552 ) 

1553 if q_bundle_entry and qr_bundle_entry: 

1554 bundle_entries += [ 

1555 # Questionnaire 

1556 q_bundle_entry, 

1557 # Collection of QuestionnaireResponse entries 

1558 qr_bundle_entry, 

1559 ] 

1560 

1561 # Observation (0 or more) -- includes Coding 

1562 bundle_entries += self._get_fhir_detail_bundle_entries(req, recipient) 

1563 

1564 # DocumentReference (0-1; always 1 in normal use ) 

1565 if skip_docs_if_other_content: 

1566 if not bundle_entries: 

1567 # We can't have nothing! 

1568 raise FhirExportException( 

1569 "Skipping task because DEBUG_SKIP_FHIR_DOCS set and no " 

1570 "other content" 

1571 ) 

1572 else: 

1573 bundle_entries.append( 

1574 self._get_fhir_docref_bundle_entry(req, recipient) 

1575 ) 

1576 

1577 return bundle_entries 

1578 

1579 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1580 # Generic 

1581 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1582 

1583 @property 

1584 def fhir_when_task_created(self) -> str: 

1585 """ 

1586 Time of task creation, in a FHIR-compatible format. 

1587 """ 

1588 return self.when_created.isoformat() 

1589 

1590 def _get_fhir_detail_bundle_entries( 

1591 self, req: "CamcopsRequest", recipient: "ExportRecipient" 

1592 ) -> List[Dict]: 

1593 """ 

1594 Returns a list of bundle entries (0-1 of them) for Observation objects, 

1595 which may each contain several ObservationComponent objects. This 

1596 includes any SNOMED codes offered, and any extras. 

1597 

1598 See: 

1599 

1600 - https://www.hl7.org/fhir/terminologies-systems.html 

1601 - https://www.hl7.org/fhir/observation.html#code-interop 

1602 - https://www.hl7.org/fhir/observation.html#gr-comp 

1603 

1604 In particular, whether information should be grouped into one 

1605 Observation (via ObservationComponent objects) or as separate 

1606 observations depends on whether it is conceptually independent. For 

1607 example, for BMI, height and weight should be separate. 

1608 """ 

1609 bundle_entries = [] # type: List[Dict] 

1610 

1611 # SNOMED, as one observation with several components: 

1612 if req.snomed_supported: 

1613 snomed_components = [] # type: List[Dict] 

1614 for expr in self.get_snomed_codes(req): 

1615 snomed_components.append( 

1616 fhir_observation_component_from_snomed(req, expr) 

1617 ) 

1618 if snomed_components: 

1619 observable_entity = req.snomed(SnomedLookup.OBSERVABLE_ENTITY) 

1620 snomed_observation = self._get_fhir_observation( 

1621 req, 

1622 recipient, 

1623 obs_dict={ 

1624 # "code" is mandatory even if there are components. 

1625 Fc.CODE: CodeableConcept( 

1626 jsondict={ 

1627 Fc.CODING: [ 

1628 Coding( 

1629 jsondict={ 

1630 Fc.SYSTEM: Fc.CODE_SYSTEM_SNOMED_CT, # noqa 

1631 Fc.CODE: str( 

1632 observable_entity.identifier 

1633 ), 

1634 Fc.DISPLAY: observable_entity.as_string( # noqa 

1635 longform=True 

1636 ), 

1637 Fc.USER_SELECTED: False, 

1638 } 

1639 ).as_json() 

1640 ], 

1641 Fc.TEXT: observable_entity.term, 

1642 } 

1643 ).as_json(), 

1644 Fc.COMPONENT: snomed_components, 

1645 }, 

1646 ) 

1647 bundle_entries.append( 

1648 make_fhir_bundle_entry( 

1649 resource_type_url=Fc.RESOURCE_TYPE_OBSERVATION, 

1650 identifier=self._get_fhir_observation_id( 

1651 req, name="snomed" 

1652 ), 

1653 resource=snomed_observation, 

1654 ) 

1655 ) 

1656 

1657 # Extra -- these can be very varied: 

1658 bundle_entries += self.get_fhir_extra_bundle_entries(req, recipient) 

1659 

1660 # Done 

1661 return bundle_entries 

1662 

1663 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1664 # Identifiers 

1665 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1666 

1667 # Generic: 

1668 

1669 def _get_fhir_id_this_task_class( 

1670 self, 

1671 req: "CamcopsRequest", 

1672 route_name: str, 

1673 value_within_task_class: Union[int, str], 

1674 ) -> Identifier: 

1675 """ 

1676 For when we want to refer to something within a specific task class, in 

1677 the abstract. The URL refers to the task class, not the task instance. 

1678 """ 

1679 return Identifier( 

1680 jsondict={ 

1681 Fc.SYSTEM: req.route_url( 

1682 route_name, 

1683 table_name=self.tablename, # to match ViewParam.TABLE_NAME 

1684 ), 

1685 Fc.VALUE: str(value_within_task_class), 

1686 } 

1687 ) 

1688 

1689 def _get_fhir_id_this_task_instance( 

1690 self, 

1691 req: "CamcopsRequest", 

1692 route_name: str, 

1693 value_within_task_instance: Union[int, str], 

1694 ) -> Identifier: 

1695 """ 

1696 A number of FHIR identifiers refer to "this task" and nothing very much 

1697 more specific (because they represent a type of thing of which there 

1698 can only be one per task), but do so through a range of different route 

1699 names that make the FHIR URLs look sensible. This is a convenience 

1700 function for them. The intention is to route to the specific task 

1701 instance concerned. 

1702 """ 

1703 return Identifier( 

1704 jsondict={ 

1705 Fc.SYSTEM: req.route_url( 

1706 route_name, 

1707 table_name=self.tablename, # to match ViewParam.TABLE_NAME 

1708 server_pk=str(self._pk), # to match ViewParam.SERVER_PK 

1709 ), 

1710 Fc.VALUE: str(value_within_task_instance), 

1711 } 

1712 ) 

1713 

1714 # Specific: 

1715 

1716 def _get_fhir_condition_id( 

1717 self, req: "CamcopsRequest", name: Union[int, str] 

1718 ) -> Identifier: 

1719 """ 

1720 Returns a FHIR Identifier for an Observation, representing this task 

1721 instance and a named observation within it. 

1722 """ 

1723 return self._get_fhir_id_this_task_instance( 

1724 req, Routes.FHIR_CONDITION, name 

1725 ) 

1726 

1727 def _get_fhir_docref_id( 

1728 self, req: "CamcopsRequest", task_format: str 

1729 ) -> Identifier: 

1730 """ 

1731 Returns a FHIR Identifier (e.g. for a DocumentReference collection) 

1732 representing the view of this task. 

1733 """ 

1734 return self._get_fhir_id_this_task_instance( 

1735 req, Routes.FHIR_DOCUMENT_REFERENCE, task_format 

1736 ) 

1737 

1738 def _get_fhir_observation_id( 

1739 self, req: "CamcopsRequest", name: str 

1740 ) -> Identifier: 

1741 """ 

1742 Returns a FHIR Identifier for an Observation, representing this task 

1743 instance and a named observation within it. 

1744 """ 

1745 return self._get_fhir_id_this_task_instance( 

1746 req, Routes.FHIR_OBSERVATION, name 

1747 ) 

1748 

1749 def _get_fhir_practitioner_id(self, req: "CamcopsRequest") -> Identifier: 

1750 """ 

1751 Returns a FHIR Identifier for the clinician. (Clinicians are not 

1752 sensibly made unique across tasks, but are task-specific.) 

1753 """ 

1754 return self._get_fhir_id_this_task_instance( 

1755 req, 

1756 Routes.FHIR_PRACTITIONER, 

1757 Fc.CAMCOPS_VALUE_CLINICIAN_WITHIN_TASK, 

1758 ) 

1759 

1760 def _get_fhir_questionnaire_id(self, req: "CamcopsRequest") -> Identifier: 

1761 """ 

1762 Returns a FHIR Identifier (e.g. for a Questionnaire) representing this 

1763 task, in the abstract. 

1764 

1765 Incorporates the CamCOPS version, so that if aspects (even the 

1766 formatting of question text) changes, a new version will be stored 

1767 despite the "ifNoneExist" clause. 

1768 """ 

1769 return Identifier( 

1770 jsondict={ 

1771 Fc.SYSTEM: req.route_url(Routes.FHIR_QUESTIONNAIRE_SYSTEM), 

1772 Fc.VALUE: f"{self.tablename}/{CAMCOPS_SERVER_VERSION_STRING}", 

1773 } 

1774 ) 

1775 

1776 def _get_fhir_questionnaire_response_id( 

1777 self, req: "CamcopsRequest" 

1778 ) -> Identifier: 

1779 """ 

1780 Returns a FHIR Identifier (e.g. for a QuestionnaireResponse collection) 

1781 representing this task instance. QuestionnaireResponse items are 

1782 specific answers, not abstract descriptions. 

1783 """ 

1784 return self._get_fhir_id_this_task_instance( 

1785 req, 

1786 Routes.FHIR_QUESTIONNAIRE_RESPONSE, 

1787 Fc.CAMCOPS_VALUE_QUESTIONNAIRE_RESPONSE_WITHIN_TASK, 

1788 ) 

1789 

1790 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1791 # References to identifiers 

1792 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1793 

1794 def _get_fhir_subject_ref( 

1795 self, req: "CamcopsRequest", recipient: "ExportRecipient" 

1796 ) -> Dict: 

1797 """ 

1798 Returns a reference to the patient, for "subject" fields. 

1799 """ 

1800 assert ( 

1801 self.has_patient 

1802 ), "Don't call Task._get_fhir_subject_ref() for anonymous tasks" 

1803 return self.patient.get_fhir_subject_ref(req, recipient) 

1804 

1805 def _get_fhir_practitioner_ref(self, req: "CamcopsRequest") -> Dict: 

1806 """ 

1807 Returns a reference to the clinician, for "practitioner" fields. 

1808 """ 

1809 assert self.has_clinician, ( 

1810 "Don't call Task._get_fhir_clinician_ref() " 

1811 "for tasks without a clinician" 

1812 ) 

1813 return FHIRReference( 

1814 jsondict={ 

1815 Fc.TYPE: Fc.RESOURCE_TYPE_PRACTITIONER, 

1816 Fc.IDENTIFIER: self._get_fhir_practitioner_id(req).as_json(), 

1817 } 

1818 ).as_json() 

1819 

1820 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1821 # DocumentReference 

1822 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1823 

1824 def _get_fhir_docref_bundle_entry( 

1825 self, 

1826 req: "CamcopsRequest", 

1827 recipient: "ExportRecipient", 

1828 text_encoding: str = UTF8, 

1829 ) -> Dict: 

1830 """ 

1831 Returns bundle entries for an attached document, which is a full 

1832 representation of the task according to the selected task format (e.g. 

1833 PDF). 

1834 

1835 This requires a DocumentReference, which can (in theory) either embed 

1836 the data, or refer via a URL to an associated Binary object. We do it 

1837 directly. 

1838 

1839 See: 

1840 

1841 - https://fhirblog.com/2013/11/06/fhir-and-xds-submitting-a-document-from-a-document-source/ 

1842 - https://fhirblog.com/2013/11/12/the-fhir-documentreference-resource/ 

1843 - https://build.fhir.org/ig/HL7/US-Core/StructureDefinition-us-core-documentreference.html 

1844 - https://build.fhir.org/ig/HL7/US-Core/clinical-notes-guidance.html 

1845 """ # noqa 

1846 

1847 # Establish content_type and binary_data 

1848 task_format = recipient.task_format 

1849 if task_format == FileType.PDF: 

1850 binary_data = self.get_pdf(req) 

1851 content_type = MimeType.PDF 

1852 else: 

1853 if task_format == FileType.XML: 

1854 txt = self.get_xml( 

1855 req, 

1856 options=TaskExportOptions( 

1857 include_blobs=False, 

1858 xml_include_ancillary=True, 

1859 xml_include_calculated=True, 

1860 xml_include_comments=True, 

1861 xml_include_patient=True, 

1862 xml_include_plain_columns=True, 

1863 xml_include_snomed=True, 

1864 xml_with_header_comments=True, 

1865 ), 

1866 ) 

1867 content_type = MimeType.XML 

1868 elif task_format == FileType.HTML: 

1869 txt = self.get_html(req) 

1870 content_type = MimeType.HTML 

1871 else: 

1872 raise ValueError(f"Unknown task format: {task_format!r}") 

1873 binary_data = txt.encode(text_encoding) 

1874 b64_encoded_bytes = b64encode(binary_data) # type: bytes 

1875 b64_encoded_str = b64_encoded_bytes.decode(ASCII) 

1876 

1877 # Build the DocumentReference 

1878 docref_id = self._get_fhir_docref_id(req, task_format) 

1879 dr_dict = { 

1880 # Metadata: 

1881 Fc.DATE: self.fhir_when_task_created, 

1882 Fc.DESCRIPTION: self.longname(req), 

1883 Fc.DOCSTATUS: ( 

1884 Fc.DOCSTATUS_FINAL 

1885 if self.is_finalized() 

1886 else Fc.DOCSTATUS_PRELIMINARY 

1887 ), 

1888 Fc.MASTER_IDENTIFIER: docref_id.as_json(), 

1889 Fc.STATUS: Fc.DOCSTATUS_CURRENT, 

1890 # And the content: 

1891 Fc.CONTENT: [ 

1892 DocumentReferenceContent( 

1893 jsondict={ 

1894 Fc.ATTACHMENT: Attachment( 

1895 jsondict={ 

1896 Fc.CONTENT_TYPE: content_type, 

1897 Fc.DATA: b64_encoded_str, 

1898 } 

1899 ).as_json() 

1900 } 

1901 ).as_json() 

1902 ], 

1903 } 

1904 # Optional metadata: 

1905 if self.has_clinician: 

1906 dr_dict[Fc.AUTHOR] = [self._get_fhir_practitioner_ref(req)] 

1907 if self.has_patient: 

1908 dr_dict[Fc.SUBJECT] = self._get_fhir_subject_ref(req, recipient) 

1909 

1910 # DocumentReference 

1911 return make_fhir_bundle_entry( 

1912 resource_type_url=Fc.RESOURCE_TYPE_DOCUMENT_REFERENCE, 

1913 identifier=docref_id, 

1914 resource=DocumentReference(jsondict=dr_dict).as_json(), 

1915 ) 

1916 

1917 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1918 # Observation 

1919 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1920 

1921 def _get_fhir_observation( 

1922 self, 

1923 req: "CamcopsRequest", 

1924 recipient: "ExportRecipient", 

1925 obs_dict: Dict, 

1926 ) -> Dict: 

1927 """ 

1928 Given a starting dictionary for an Observation, complete it for this 

1929 task (by adding "when", "who", and status information) and return the 

1930 Observation (as a dict in JSON format). 

1931 """ 

1932 obs_dict.update( 

1933 { 

1934 Fc.EFFECTIVE_DATE_TIME: self.fhir_when_task_created, 

1935 Fc.STATUS: ( 

1936 Fc.OBSSTATUS_FINAL 

1937 if self.is_finalized() 

1938 else Fc.OBSSTATUS_PRELIMINARY 

1939 ), 

1940 } 

1941 ) 

1942 if self.has_patient: 

1943 obs_dict[Fc.SUBJECT] = self._get_fhir_subject_ref(req, recipient) 

1944 return Observation(jsondict=obs_dict).as_json() 

1945 

1946 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1947 # Practitioner (clinician) 

1948 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1949 

1950 def _get_fhir_clinician_bundle_entry(self, req: "CamcopsRequest") -> Dict: 

1951 """ 

1952 Supplies information on the clinician associated with this task, as a 

1953 FHIR Practitioner object (within a bundle). 

1954 """ 

1955 assert self.has_clinician, ( 

1956 "Don't call Task._get_fhir_practitioner_bundle_entry() " 

1957 "for tasks without a clinician" 

1958 ) 

1959 practitioner = Practitioner( 

1960 jsondict={ 

1961 Fc.NAME: [ 

1962 HumanName( 

1963 jsondict={Fc.TEXT: self.get_clinician_name()} 

1964 ).as_json() 

1965 ], 

1966 # "qualification" is too structured. 

1967 # There isn't anywhere to represent our other information, so 

1968 # we jam it in to "telecom"/"other". 

1969 Fc.TELECOM: [ 

1970 ContactPoint( 

1971 jsondict={ 

1972 Fc.SYSTEM: Fc.TELECOM_SYSTEM_OTHER, 

1973 Fc.VALUE: self.get_clinician_fhir_telecom_other( 

1974 req 

1975 ), 

1976 } 

1977 ).as_json() 

1978 ], 

1979 } 

1980 ).as_json() 

1981 return make_fhir_bundle_entry( 

1982 resource_type_url=Fc.RESOURCE_TYPE_PRACTITIONER, 

1983 identifier=self._get_fhir_practitioner_id(req), 

1984 resource=practitioner, 

1985 ) 

1986 

1987 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1988 # Questionnaire, QuestionnaireResponse 

1989 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

1990 

1991 def _get_fhir_q_qr_bundle_entries( 

1992 self, req: "CamcopsRequest", recipient: "ExportRecipient" 

1993 ) -> Tuple[Optional[Dict], Optional[Dict]]: 

1994 """ 

1995 Get a tuple of FHIR bundles: ``questionnaire_bundle_entry, 

1996 questionnaire_response_bundle_entry``. 

1997 

1998 A Questionnaire object represents the task in the abstract; 

1999 QuestionnaireReponse items represent each answered question for a 

2000 specific task instance. 

2001 """ 

2002 # Ask the task for its details (which it may provide directly, by 

2003 # overriding, or rely on autodiscovery for the default). 

2004 aq_items = self.get_fhir_questionnaire(req) 

2005 if DEBUG_SHOW_FHIR_QUESTIONNAIRE: 

2006 if aq_items: 

2007 qa_str = "\n".join(f"- {str(x)}" for x in aq_items) 

2008 log.debug(f"FHIR questions/answers:\n{qa_str}") 

2009 else: 

2010 log.debug("No FHIR questionnaire data") 

2011 

2012 # Do we have data? 

2013 if not aq_items: 

2014 return None, None 

2015 

2016 # Now finish off: 

2017 q_items = [aq.questionnaire_item() for aq in aq_items] 

2018 qr_items = [aq.questionnaire_response_item() for aq in aq_items] 

2019 q_bundle_entry = self._make_fhir_questionnaire_bundle_entry( 

2020 req, q_items 

2021 ) 

2022 qr_bundle_entry = self._make_fhir_questionnaire_response_bundle_entry( 

2023 req, recipient, qr_items 

2024 ) 

2025 return q_bundle_entry, qr_bundle_entry 

2026 

2027 def _make_fhir_questionnaire_bundle_entry( 

2028 self, req: "CamcopsRequest", q_items: List[Dict] 

2029 ) -> Optional[Dict]: 

2030 """ 

2031 Make a FHIR bundle entry describing this task, as a FHIR Questionnaire, 

2032 from supplied Questionnaire items. Note: here we mean "abstract task", 

2033 not "task instance". 

2034 """ 

2035 # FHIR supports versioning of questionnaires. Might be useful if the 

2036 # wording of questions change. Could either use FHIR's version 

2037 # field or include the version in the identifier below. Either way 

2038 # we'd need the version in the 'ifNoneExist' part of the request. 

2039 q_identifier = self._get_fhir_questionnaire_id(req) 

2040 

2041 # Other things we could add: 

2042 # https://www.hl7.org/fhir/questionnaire.html 

2043 # 

2044 # date: Date last changed 

2045 # useContext: https://www.hl7.org/fhir/metadatatypes.html#UsageContext 

2046 help_url = self.help_url() 

2047 questionnaire = Questionnaire( 

2048 jsondict={ 

2049 Fc.NAME: self.shortname, # Computer-friendly name 

2050 Fc.TITLE: self.longname(req), # Human name 

2051 Fc.DESCRIPTION: help_url, # Natural language description of the questionnaire # noqa 

2052 Fc.COPYRIGHT: help_url, # Use and/or publishing restrictions 

2053 Fc.VERSION: CAMCOPS_SERVER_VERSION_STRING, 

2054 Fc.STATUS: Fc.QSTATUS_ACTIVE, # Could also be: draft, retired, unknown # noqa 

2055 Fc.ITEM: q_items, 

2056 } 

2057 ) 

2058 return make_fhir_bundle_entry( 

2059 resource_type_url=Fc.RESOURCE_TYPE_QUESTIONNAIRE, 

2060 identifier=q_identifier, 

2061 resource=questionnaire.as_json(), 

2062 ) 

2063 

2064 def _make_fhir_questionnaire_response_bundle_entry( 

2065 self, 

2066 req: "CamcopsRequest", 

2067 recipient: "ExportRecipient", 

2068 qr_items: List[Dict], 

2069 ) -> Dict: 

2070 """ 

2071 Make a bundle entry from FHIR QuestionnaireResponse items (e.g. one for 

2072 the response to each question in a quesionnaire-style task). 

2073 """ 

2074 q_identifier = self._get_fhir_questionnaire_id(req) 

2075 qr_identifier = self._get_fhir_questionnaire_response_id(req) 

2076 

2077 # Status: 

2078 # https://www.hl7.org/fhir/valueset-questionnaire-answers-status.html 

2079 # It is probably undesirable to export tasks that are incomplete in the 

2080 # sense of "not finalized". The user can control this (via the 

2081 # FINALIZED_ONLY config option for exports). However, we also need to 

2082 # handle finalized but incomplete data. 

2083 if self.is_complete(): 

2084 status = Fc.QSTATUS_COMPLETED 

2085 elif self.is_live_on_tablet(): 

2086 status = Fc.QSTATUS_IN_PROGRESS 

2087 else: 

2088 # Incomplete, but finalized. 

2089 status = Fc.QSTATUS_STOPPED 

2090 

2091 qr_jsondict = { 

2092 # https://r4.smarthealthit.org does not like "questionnaire" in 

2093 # this form: 

2094 # FHIR Server; FHIR 4.0.0/R4; HAPI FHIR 4.0.0-SNAPSHOT) 

2095 # error is: 

2096 # Invalid resource reference found at 

2097 # path[QuestionnaireResponse.questionnaire]- Resource type is 

2098 # unknown or not supported on this server 

2099 # - http://127.0.0.1:8000/fhir_questionnaire|phq9 

2100 # http://hapi.fhir.org/baseR4/ (4.0.1 (R4)) is OK 

2101 Fc.QUESTIONNAIRE: fhir_sysval_from_id(q_identifier), 

2102 Fc.AUTHORED: self.fhir_when_task_created, 

2103 Fc.STATUS: status, 

2104 # TODO: Could also add: 

2105 # https://www.hl7.org/fhir/questionnaireresponse.html 

2106 # author: Person who received and recorded the answers 

2107 # source: The person who answered the questions 

2108 Fc.ITEM: qr_items, 

2109 } 

2110 

2111 if self.has_patient: 

2112 qr_jsondict[Fc.SUBJECT] = self._get_fhir_subject_ref( # type: ignore[assignment] # noqa: E501 

2113 req, recipient 

2114 ) 

2115 

2116 return make_fhir_bundle_entry( 

2117 resource_type_url=Fc.RESOURCE_TYPE_QUESTIONNAIRE_RESPONSE, 

2118 identifier=qr_identifier, 

2119 resource=QuestionnaireResponse(qr_jsondict).as_json(), 

2120 identifier_is_list=False, 

2121 ) 

2122 

2123 # ------------------------------------------------------------------------- 

2124 # FHIR: functions to override if desired 

2125 # ------------------------------------------------------------------------- 

2126 

2127 def get_fhir_questionnaire( 

2128 self, req: "CamcopsRequest" 

2129 ) -> List[FHIRAnsweredQuestion]: 

2130 """ 

2131 Return FHIR information about a questionnaire: both about the task in 

2132 the abstract (the questions) and the answers for this specific 

2133 instance. 

2134 

2135 May be overridden. 

2136 """ 

2137 return self._fhir_autodiscover(req) 

2138 

2139 def get_fhir_extra_bundle_entries( 

2140 self, req: "CamcopsRequest", recipient: "ExportRecipient" 

2141 ) -> List[Dict]: 

2142 """ 

2143 Return a list of extra FHIR bundle entries, if relevant. (SNOMED-CT 

2144 codes are done automatically; don't repeat those.) 

2145 """ 

2146 return [] 

2147 

2148 def get_qtext(self, req: "CamcopsRequest", attrname: str) -> Optional[str]: 

2149 """ 

2150 Returns the text associated with a particular question. 

2151 The default implementation is a guess. 

2152 

2153 Args: 

2154 req: 

2155 A :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`. 

2156 attrname: 

2157 Name of the attribute (field) on this task that represents the 

2158 question. 

2159 """ 

2160 return self.xstring(req, attrname, provide_default_if_none=False) 

2161 

2162 def get_atext( 

2163 self, req: "CamcopsRequest", attrname: str, answer_value: int 

2164 ) -> Optional[str]: 

2165 """ 

2166 Returns the text associated with a particular answer to a question. 

2167 The default implementation is a guess. 

2168 

2169 Args: 

2170 req: 

2171 A :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`. 

2172 attrname: 

2173 Name of the attribute (field) on this task that represents the 

2174 question. 

2175 answer_value: 

2176 Answer value. 

2177 """ 

2178 stringname = f"{attrname}_a{answer_value}" 

2179 return self.xstring(req, stringname, provide_default_if_none=False) 

2180 

2181 # ------------------------------------------------------------------------- 

2182 # FHIR automatic interrogation 

2183 # ------------------------------------------------------------------------- 

2184 

2185 def _fhir_autodiscover( 

2186 self, req: "CamcopsRequest" 

2187 ) -> List[FHIRAnsweredQuestion]: 

2188 """ 

2189 Inspect this task instance and create information about both the task 

2190 in the abstract and the answers for this specific instance. 

2191 """ 

2192 qa_items = [] # type: List[FHIRAnsweredQuestion] 

2193 

2194 skip_fields = TASK_FREQUENT_FIELDS 

2195 for attrname, column in gen_columns(self): 

2196 if attrname in skip_fields: 

2197 continue 

2198 comment = column.comment 

2199 coltype = column.type 

2200 

2201 # Question text: 

2202 retrieved_qtext = self.get_qtext(req, attrname) 

2203 qtext_components = [] 

2204 if retrieved_qtext: 

2205 qtext_components.append(retrieved_qtext) 

2206 if comment: 

2207 qtext_components.append(f"[{comment}]") 

2208 if not qtext_components: 

2209 qtext_components = (attrname,) # type: ignore[assignment] 

2210 if not qtext_components: 

2211 qtext_components = (FHIR_UNKNOWN_TEXT,) # type: ignore[assignment] # noqa: E501 

2212 qtext = " ".join(qtext_components) 

2213 # Note that it's good to get the column comment in somewhere; these 

2214 # often explain the meaning of the field quite well. It may or may 

2215 # not be possible to get it into the option values -- many answer 

2216 # types don't permit those. QuestionnaireItem records don't have a 

2217 # comment field (see 

2218 # https://www.hl7.org/fhir/questionnaire-definitions.html#Questionnaire.item), # noqa 

2219 # so the best we can do is probably to stuff it into the question 

2220 # text, even if that causes some visual duplication. 

2221 

2222 # Thinking about types: 

2223 int_type = isinstance(coltype, Integer) 

2224 bool_type = ( 

2225 is_sqlatype_binary(coltype) 

2226 or isinstance(coltype, Boolean) 

2227 # For booleans represented as integers: it is better to be as 

2228 # constraining as possible and say that only 0/1 options are 

2229 # present by marking these as Boolean, which is less 

2230 # complicated for the recipient than "integer but with possible 

2231 # options 0 or 1". We will *also* show the possible options, 

2232 # just to be clear. 

2233 ) 

2234 if int_type: 

2235 qtype = FHIRQuestionType.INTEGER 

2236 atype = FHIRAnswerType.INTEGER 

2237 elif isinstance(coltype, String): # includes its subclass, Text 

2238 qtype = FHIRQuestionType.STRING 

2239 atype = FHIRAnswerType.STRING 

2240 elif isinstance(coltype, Numeric): # includes Float, Decimal 

2241 qtype = FHIRQuestionType.QUANTITY 

2242 atype = FHIRAnswerType.QUANTITY 

2243 elif isinstance( 

2244 coltype, (DateTime, PendulumDateTimeAsIsoTextColType) 

2245 ): 

2246 qtype = FHIRQuestionType.DATETIME 

2247 atype = FHIRAnswerType.DATETIME 

2248 elif isinstance(coltype, DateColType): 

2249 qtype = FHIRQuestionType.DATE 

2250 atype = FHIRAnswerType.DATE 

2251 elif isinstance(coltype, Time): 

2252 qtype = FHIRQuestionType.TIME 

2253 atype = FHIRAnswerType.TIME 

2254 elif bool_type: 

2255 qtype = FHIRQuestionType.BOOLEAN 

2256 atype = FHIRAnswerType.BOOLEAN 

2257 else: 

2258 raise NotImplementedError(f"Unknown column type: {coltype!r}") 

2259 

2260 # Thinking about MCQ options: 

2261 answer_options = None # type: Optional[Dict[Any, str]] 

2262 if (int_type or bool_type) and hasattr( 

2263 column, COLATTR_PERMITTED_VALUE_CHECKER 

2264 ): 

2265 pvc = getattr( 

2266 column, COLATTR_PERMITTED_VALUE_CHECKER 

2267 ) # type: PermittedValueChecker 

2268 if pvc is not None: 

2269 pv = pvc.permitted_values_inc_minmax() 

2270 if pv: 

2271 qtype = FHIRQuestionType.CHOICE 

2272 # ... has to be of type "choice" to transmit the 

2273 # possible values. 

2274 answer_options = {} 

2275 for v in pv: 

2276 answer_options[v] = ( 

2277 self.get_atext(req, attrname, v) 

2278 or comment 

2279 or FHIR_UNKNOWN_TEXT 

2280 ) 

2281 

2282 # Assemble: 

2283 qa_items.append( 

2284 FHIRAnsweredQuestion( 

2285 qname=attrname, 

2286 qtext=qtext, 

2287 qtype=qtype, 

2288 answer_type=atype, 

2289 answer=getattr(self, attrname), 

2290 answer_options=answer_options, 

2291 ) 

2292 ) 

2293 

2294 # We don't currently put any summary information into FHIR exports. I 

2295 # think that isn't within the spirit of the system, but am not sure. 

2296 # todo: Check if summary information should go into FHIR exports. 

2297 

2298 return qa_items 

2299 

2300 # ------------------------------------------------------------------------- 

2301 # Export (generically) 

2302 # ------------------------------------------------------------------------- 

2303 

2304 def cancel_from_export_log( 

2305 self, req: "CamcopsRequest", from_console: bool = False 

2306 ) -> None: 

2307 """ 

2308 Marks all instances of this task as "cancelled" in the export log, so 

2309 it will be resent. 

2310 """ 

2311 if self._pk is None: 

2312 return 

2313 from camcops_server.cc_modules.cc_exportmodels import ( 

2314 ExportedTask, 

2315 ) # delayed import 

2316 

2317 # noinspection PyUnresolvedReferences 

2318 statement = ( 

2319 update(ExportedTask.__table__) # type: ignore[arg-type] 

2320 .where(ExportedTask.basetable == self.tablename) 

2321 .where(ExportedTask.task_server_pk == self._pk) 

2322 .where( 

2323 not_(ExportedTask.cancelled) | ExportedTask.cancelled.is_(None) 

2324 ) 

2325 .values(cancelled=1, cancelled_at_utc=req.now_utc) 

2326 ) 

2327 # ... this bit: ... AND (NOT cancelled OR cancelled IS NULL) ...: 

2328 # https://stackoverflow.com/questions/37445041/sqlalchemy-how-to-filter-column-which-contains-both-null-and-integer-values # noqa 

2329 req.dbsession.execute(statement) 

2330 self.audit( 

2331 req, 

2332 "Task cancelled in export log (may trigger resending)", 

2333 from_console, 

2334 ) 

2335 

2336 # ------------------------------------------------------------------------- 

2337 # Audit 

2338 # ------------------------------------------------------------------------- 

2339 

2340 def audit( 

2341 self, req: "CamcopsRequest", details: str, from_console: bool = False 

2342 ) -> None: 

2343 """ 

2344 Audits actions to this task. 

2345 """ 

2346 audit( 

2347 req, 

2348 details, 

2349 patient_server_pk=self.get_patient_server_pk(), 

2350 table=self.tablename, 

2351 server_pk=self._pk, 

2352 from_console=from_console, 

2353 ) 

2354 

2355 # ------------------------------------------------------------------------- 

2356 # Erasure (wiping, leaving record as placeholder) 

2357 # ------------------------------------------------------------------------- 

2358 

2359 def manually_erase(self, req: "CamcopsRequest") -> None: 

2360 """ 

2361 Manually erases a task (including sub-tables). 

2362 Also erases linked non-current records. 

2363 This WIPES THE CONTENTS but LEAVES THE RECORD AS A PLACEHOLDER. 

2364 

2365 Audits the erasure. Propagates erase through to the HL7 log, so those 

2366 records will be re-sent. WRITES TO DATABASE. 

2367 """ 

2368 # Erase ourself and any other in our "family" 

2369 for task in self.get_lineage(): 

2370 task.manually_erase_with_dependants(req) 

2371 # Audit and clear HL7 message log 

2372 self.audit(req, "Task details erased manually") 

2373 self.cancel_from_export_log(req) 

2374 

2375 def is_erased(self) -> bool: 

2376 """ 

2377 Has the task been manually erased? See :func:`manually_erase`. 

2378 """ 

2379 return self._manually_erased 

2380 

2381 # ------------------------------------------------------------------------- 

2382 # Complete deletion 

2383 # ------------------------------------------------------------------------- 

2384 

2385 def delete_entirely(self, req: "CamcopsRequest") -> None: 

2386 """ 

2387 Completely delete this task, its lineage, and its dependants. 

2388 """ 

2389 for task in self.get_lineage(): 

2390 task.delete_with_dependants(req) 

2391 self.audit(req, "Task deleted") 

2392 

2393 # ------------------------------------------------------------------------- 

2394 # Filtering tasks for the task list 

2395 # ------------------------------------------------------------------------- 

2396 

2397 @classmethod 

2398 def gen_text_filter_columns( 

2399 cls, 

2400 ) -> Generator[Tuple[str, Column], None, None]: 

2401 """ 

2402 Yields tuples of ``attrname, column``, for columns that are suitable 

2403 for text filtering. 

2404 """ 

2405 for attrname, column in gen_columns(cls): 

2406 if attrname.startswith("_"): # system field 

2407 continue 

2408 if not is_sqlatype_string(column.type): 

2409 continue 

2410 yield attrname, column 

2411 

2412 @classmethod 

2413 @cache_region_static.cache_on_arguments(function_key_generator=fkg) 

2414 def get_text_filter_columns(cls) -> List[Column]: 

2415 """ 

2416 Cached function to return a list of SQLAlchemy Column objects suitable 

2417 for text filtering. 

2418 """ 

2419 return [col for _, col in cls.gen_text_filter_columns()] 

2420 

2421 def contains_text(self, text: str) -> bool: 

2422 """ 

2423 Does this task contain the specified text? 

2424 

2425 Args: 

2426 text: 

2427 string that must be present in at least one of our text 

2428 columns 

2429 

2430 Returns: 

2431 is the strings present? 

2432 """ 

2433 text = text.lower() 

2434 for attrname, _ in self.gen_text_filter_columns(): 

2435 value = getattr(self, attrname) 

2436 if value is None: 

2437 continue 

2438 assert isinstance(value, str), "Internal bug in contains_text" 

2439 if text in value.lower(): 

2440 return True 

2441 return False 

2442 

2443 def contains_all_strings(self, strings: List[str]) -> bool: 

2444 """ 

2445 Does this task contain all the specified strings? 

2446 

2447 Args: 

2448 strings: 

2449 list of strings; each string must be present in at least 

2450 one of our text columns 

2451 

2452 Returns: 

2453 are all strings present? 

2454 """ 

2455 return all(self.contains_text(text) for text in strings) 

2456 

2457 # ------------------------------------------------------------------------- 

2458 # Spreadsheet export for basic research dump 

2459 # ------------------------------------------------------------------------- 

2460 

2461 def get_spreadsheet_pages( 

2462 self, req: "CamcopsRequest" 

2463 ) -> List["SpreadsheetPage"]: 

2464 """ 

2465 Returns information used for the basic research dump in (e.g.) TSV 

2466 format. 

2467 """ 

2468 # 1. Our core fields, plus summary information 

2469 main_page = self._get_core_spreadsheet_page(req) 

2470 

2471 # 2. Patient details. 

2472 if self.patient: 

2473 main_page.add_or_set_columns_from_page( 

2474 self.patient.get_spreadsheet_page(req) 

2475 ) 

2476 pages = [main_page] 

2477 

2478 # 3. +/- Ancillary objects 

2479 for ( 

2480 ancillary 

2481 ) in self.gen_ancillary_instances(): # type: GenericTabletRecordMixin 

2482 page = ancillary._get_core_spreadsheet_page(req) 

2483 pages.append(page) 

2484 

2485 # 4. +/- Extra summary tables (inc. SNOMED) 

2486 for est in self.get_all_summary_tables(req): 

2487 pages.append(est.get_spreadsheet_page()) 

2488 

2489 # Done 

2490 return pages 

2491 

2492 def get_spreadsheet_schema_elements( 

2493 self, req: "CamcopsRequest" 

2494 ) -> Set[SummarySchemaInfo]: 

2495 """ 

2496 Returns schema information used for spreadsheets -- more than just 

2497 the database columns, and in the same format as the spreadsheets. 

2498 """ 

2499 table_name = self.__tablename__ 

2500 

2501 # 1(a). Database columns: main table 

2502 items = self._get_core_spreadsheet_schema() 

2503 # 1(b). Summary information. 

2504 for summary in self.get_summaries(req): 

2505 items.add( 

2506 SummarySchemaInfo.from_summary_element(table_name, summary) 

2507 ) 

2508 

2509 # 2. Patient details 

2510 if self.patient: 

2511 items.update( 

2512 self.patient.get_spreadsheet_schema_elements(req, table_name) 

2513 ) 

2514 

2515 # 3. Ancillary objects 

2516 for ( 

2517 ancillary 

2518 ) in self.gen_ancillary_instances(): # type: GenericTabletRecordMixin 

2519 items.update(ancillary._get_core_spreadsheet_schema()) 

2520 

2521 # 4. Extra summary tables 

2522 for est in self.get_all_summary_tables(req): 

2523 items.update(est.get_spreadsheet_schema_elements()) 

2524 

2525 return items 

2526 

2527 # ------------------------------------------------------------------------- 

2528 # XML view 

2529 # ------------------------------------------------------------------------- 

2530 

2531 def get_xml( 

2532 self, 

2533 req: "CamcopsRequest", 

2534 options: TaskExportOptions = None, 

2535 indent_spaces: int = 4, 

2536 eol: str = "\n", 

2537 ) -> str: 

2538 """ 

2539 Returns XML describing the task. 

2540 

2541 Args: 

2542 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2543 options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions` 

2544 

2545 indent_spaces: number of spaces to indent formatted XML 

2546 eol: end-of-line string 

2547 

2548 Returns: 

2549 an XML UTF-8 document representing the task. 

2550 

2551 """ # noqa 

2552 options = options or TaskExportOptions() 

2553 tree = self.get_xml_root(req=req, options=options) 

2554 return get_xml_document( 

2555 tree, 

2556 indent_spaces=indent_spaces, 

2557 eol=eol, 

2558 include_comments=options.xml_include_comments, 

2559 ) 

2560 

2561 def get_xml_root( 

2562 self, req: "CamcopsRequest", options: TaskExportOptions 

2563 ) -> XmlElement: 

2564 """ 

2565 Returns an XML tree. The return value is the root 

2566 :class:`camcops_server.cc_modules.cc_xml.XmlElement`. 

2567 

2568 Override to include other tables, or to deal with BLOBs, if the default 

2569 methods are insufficient. 

2570 

2571 Args: 

2572 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2573 options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions` 

2574 """ # noqa 

2575 # Core (inc. core BLOBs) 

2576 branches = self._get_xml_core_branches(req=req, options=options) 

2577 tree = XmlElement(name=self.tablename, value=branches) 

2578 return tree 

2579 

2580 def _get_xml_core_branches( 

2581 self, req: "CamcopsRequest", options: TaskExportOptions = None 

2582 ) -> List[XmlElement]: 

2583 """ 

2584 Returns a list of :class:`camcops_server.cc_modules.cc_xml.XmlElement` 

2585 elements representing stored, calculated, patient, and/or BLOB fields, 

2586 depending on the options. 

2587 

2588 Args: 

2589 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2590 options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions` 

2591 """ # noqa 

2592 options = options or TaskExportOptions( 

2593 xml_include_plain_columns=True, 

2594 xml_include_ancillary=True, 

2595 include_blobs=False, 

2596 xml_include_calculated=True, 

2597 xml_include_patient=True, 

2598 xml_include_snomed=True, 

2599 ) 

2600 

2601 def add_comment(comment: XmlLiteral) -> None: 

2602 if options.xml_with_header_comments: 

2603 branches.append(comment) 

2604 

2605 # Stored values +/- calculated values 

2606 core_options = options.clone() 

2607 core_options.include_blobs = False 

2608 branches = self._get_xml_branches(req=req, options=core_options) 

2609 

2610 # SNOMED-CT codes 

2611 if options.xml_include_snomed and req.snomed_supported: 

2612 add_comment(XML_COMMENT_SNOMED_CT) 

2613 snomed_codes = self.get_snomed_codes(req) 

2614 snomed_branches = [] # type: List[XmlElement] 

2615 for code in snomed_codes: 

2616 snomed_branches.append(code.xml_element()) 

2617 branches.append( 

2618 XmlElement(name=XML_NAME_SNOMED_CODES, value=snomed_branches) 

2619 ) 

2620 

2621 # Special notes 

2622 add_comment(XML_COMMENT_SPECIAL_NOTES) 

2623 for sn in self.special_notes: 

2624 branches.append(sn.get_xml_root()) 

2625 

2626 # Patient details 

2627 if self.is_anonymous: 

2628 add_comment(XML_COMMENT_ANONYMOUS) 

2629 elif options.xml_include_patient: 

2630 add_comment(XML_COMMENT_PATIENT) 

2631 patient_options = TaskExportOptions( 

2632 xml_include_plain_columns=True, 

2633 xml_with_header_comments=options.xml_with_header_comments, 

2634 ) 

2635 if self.patient: 

2636 branches.append( 

2637 self.patient.get_xml_root(req, patient_options) 

2638 ) 

2639 

2640 # BLOBs 

2641 if options.include_blobs: 

2642 add_comment(XML_COMMENT_BLOBS) 

2643 blob_options = TaskExportOptions( 

2644 include_blobs=True, 

2645 xml_skip_fields=options.xml_skip_fields, 

2646 xml_sort_by_name=True, 

2647 xml_with_header_comments=False, 

2648 ) 

2649 branches += self._get_xml_branches(req=req, options=blob_options) 

2650 

2651 # Ancillary objects 

2652 if options.xml_include_ancillary: 

2653 ancillary_options = TaskExportOptions( 

2654 xml_include_plain_columns=True, 

2655 xml_include_ancillary=True, 

2656 include_blobs=options.include_blobs, 

2657 xml_include_calculated=options.xml_include_calculated, 

2658 xml_sort_by_name=True, 

2659 xml_with_header_comments=options.xml_with_header_comments, 

2660 ) 

2661 item_collections = [] # type: List[XmlElement] 

2662 found_ancillary = False 

2663 # We use a slightly more manual iteration process here so that 

2664 # we iterate through individual ancillaries but clustered by their 

2665 # name (e.g. if we have 50 trials and 5 groups, we do them in 

2666 # collections). 

2667 for attrname, rel_prop, rel_cls in gen_ancillary_relationships( 

2668 self 

2669 ): 

2670 if not found_ancillary: 

2671 add_comment(XML_COMMENT_ANCILLARY) 

2672 found_ancillary = True 

2673 itembranches = [] # type: List[XmlElement] 

2674 if rel_prop.uselist: 

2675 ancillaries = getattr( 

2676 self, attrname 

2677 ) # type: List[GenericTabletRecordMixin] 

2678 else: 

2679 ancillaries = [ # type: ignore[no-redef] 

2680 getattr(self, attrname) 

2681 ] # type: List[GenericTabletRecordMixin] 

2682 for ancillary in ancillaries: 

2683 itembranches.append( 

2684 ancillary._get_xml_root( 

2685 req=req, options=ancillary_options 

2686 ) 

2687 ) 

2688 itemcollection = XmlElement(name=attrname, value=itembranches) 

2689 item_collections.append(itemcollection) 

2690 item_collections.sort(key=lambda el: el.name) 

2691 branches += item_collections 

2692 

2693 # Completely separate additional summary tables 

2694 if options.xml_include_calculated: 

2695 item_collections = [] # type: ignore[no-redef] 

2696 found_est = False 

2697 for est in self.get_extra_summary_tables(req): 

2698 # ... not get_all_summary_tables(); we handled SNOMED 

2699 # differently, above 

2700 if not found_est and est.rows: 

2701 add_comment(XML_COMMENT_CALCULATED) 

2702 found_est = True 

2703 item_collections.append(est.get_xml_element()) 

2704 item_collections.sort(key=lambda el: el.name) 

2705 branches += item_collections 

2706 

2707 return branches 

2708 

2709 # ------------------------------------------------------------------------- 

2710 # HTML view 

2711 # ------------------------------------------------------------------------- 

2712 

2713 def get_html(self, req: "CamcopsRequest", anonymise: bool = False) -> str: 

2714 """ 

2715 Returns HTML representing the task, for our HTML view. 

2716 

2717 Args: 

2718 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2719 anonymise: hide patient identifying details? 

2720 """ 

2721 req.prepare_for_html_figures() 

2722 return render( 

2723 "task.mako", 

2724 dict( 

2725 task=self, 

2726 anonymise=anonymise, 

2727 signature=False, 

2728 viewtype=ViewArg.HTML, 

2729 ), 

2730 request=req, 

2731 ) 

2732 

2733 def title_for_html( 

2734 self, req: "CamcopsRequest", anonymise: bool = False 

2735 ) -> str: 

2736 """ 

2737 Returns the plain text used for the HTML ``<title>`` block (by 

2738 ``task.mako``), and also for the PDF title for PDF exports. 

2739 

2740 Should be plain text only. 

2741 

2742 Args: 

2743 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2744 anonymise: hide patient identifying details? 

2745 """ 

2746 if anonymise: 

2747 patient = "?" 

2748 elif self.patient: 

2749 patient = self.patient.prettystr(req) 

2750 else: 

2751 _ = req.gettext 

2752 patient = _("Anonymous") 

2753 tasktype = self.tablename 

2754 when = format_datetime( 

2755 self.get_creation_datetime(), 

2756 DateFormat.ISO8601_HUMANIZED_TO_MINUTES, 

2757 "", 

2758 ) 

2759 return f"CamCOPS: {patient}; {tasktype}; {when}" 

2760 

2761 # ------------------------------------------------------------------------- 

2762 # PDF view 

2763 # ------------------------------------------------------------------------- 

2764 

2765 def get_pdf(self, req: "CamcopsRequest", anonymise: bool = False) -> bytes: 

2766 """ 

2767 Returns a PDF representing the task. 

2768 

2769 Args: 

2770 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2771 anonymise: hide patient identifying details? 

2772 """ 

2773 html = self.get_pdf_html(req, anonymise=anonymise) # main content 

2774 if CSS_PAGED_MEDIA: 

2775 return pdf_from_html(req, html=html) 

2776 else: 

2777 return pdf_from_html( 

2778 req, 

2779 html=html, 

2780 header_html=render( 

2781 "wkhtmltopdf_header.mako", 

2782 dict( 

2783 inner_text=render( 

2784 "task_page_header.mako", 

2785 dict(task=self, anonymise=anonymise), 

2786 request=req, 

2787 ) 

2788 ), 

2789 request=req, 

2790 ), 

2791 footer_html=render( 

2792 "wkhtmltopdf_footer.mako", 

2793 dict( 

2794 inner_text=render( 

2795 "task_page_footer.mako", 

2796 dict(task=self), 

2797 request=req, 

2798 ) 

2799 ), 

2800 request=req, 

2801 ), 

2802 extra_wkhtmltopdf_options={ 

2803 "orientation": ( 

2804 "Landscape" 

2805 if self.use_landscape_for_pdf 

2806 else "Portrait" 

2807 ) 

2808 }, 

2809 ) 

2810 

2811 def get_pdf_html( 

2812 self, req: "CamcopsRequest", anonymise: bool = False 

2813 ) -> str: 

2814 """ 

2815 Gets the HTML used to make the PDF (slightly different from the HTML 

2816 used for the HTML view). 

2817 """ 

2818 req.prepare_for_pdf_figures() 

2819 return render( 

2820 "task.mako", 

2821 dict( 

2822 task=self, 

2823 anonymise=anonymise, 

2824 pdf_landscape=self.use_landscape_for_pdf, 

2825 signature=self.has_clinician, 

2826 viewtype=ViewArg.PDF, 

2827 ), 

2828 request=req, 

2829 ) 

2830 

2831 def suggested_pdf_filename( 

2832 self, req: "CamcopsRequest", anonymise: bool = False 

2833 ) -> str: 

2834 """ 

2835 Suggested filename for the PDF copy (for downloads). 

2836 

2837 Args: 

2838 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2839 anonymise: hide patient identifying details? 

2840 """ 

2841 cfg = req.config 

2842 if anonymise: 

2843 is_anonymous = True 

2844 else: 

2845 is_anonymous = self.is_anonymous 

2846 patient = self.patient 

2847 return get_export_filename( 

2848 req=req, 

2849 patient_spec_if_anonymous=cfg.patient_spec_if_anonymous, 

2850 patient_spec=cfg.patient_spec, 

2851 filename_spec=cfg.task_filename_spec, 

2852 filetype=ViewArg.PDF, 

2853 is_anonymous=is_anonymous, 

2854 surname=patient.get_surname() if patient else "", 

2855 forename=patient.get_forename() if patient else "", 

2856 dob=patient.get_dob() if patient else None, 

2857 sex=patient.get_sex() if patient else None, 

2858 idnum_objects=patient.get_idnum_objects() if patient else None, 

2859 creation_datetime=self.get_creation_datetime(), 

2860 basetable=self.tablename, 

2861 serverpk=self._pk, 

2862 ) 

2863 

2864 def write_pdf_to_disk(self, req: "CamcopsRequest", filename: str) -> None: 

2865 """ 

2866 Writes the PDF to disk, using ``filename``. 

2867 """ 

2868 pdffile = open(filename, "wb") 

2869 pdffile.write(self.get_pdf(req)) 

2870 

2871 # ------------------------------------------------------------------------- 

2872 # Metadata for e.g. RiO 

2873 # ------------------------------------------------------------------------- 

2874 

2875 def get_rio_metadata( 

2876 self, 

2877 req: "CamcopsRequest", 

2878 which_idnum: int, 

2879 uploading_user_id: str, 

2880 document_type: str, 

2881 ) -> str: 

2882 """ 

2883 Returns metadata for the task that Servelec's RiO electronic patient 

2884 record may want. 

2885 

2886 Args: 

2887 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2888 which_idnum: which CamCOPS ID number type corresponds to the RiO 

2889 client ID? 

2890 uploading_user_id: RiO user ID (string) of the user who will 

2891 be recorded as uploading this information; see below 

2892 document_type: a string indicating the RiO-defined document type 

2893 (this is system-specific); see below 

2894 

2895 Returns: 

2896 a newline-terminated single line of CSV values; see below 

2897 

2898 Called by 

2899 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup.export_task`. 

2900 

2901 From Servelec (Lee Meredith) to Rudolf Cardinal, 2014-12-04: 

2902 

2903 .. code-block:: none 

2904 

2905 Batch Document Upload 

2906 

2907 The RiO batch document upload function can be used to upload 

2908 documents in bulk automatically. RiO includes a Batch Upload 

2909 windows service which monitors a designated folder for new files. 

2910 Each file which is scanned must be placed in the designated folder 

2911 along with a meta-data file which describes the document. So 

2912 essentially if a document had been scanned in and was called 

2913 ‘ThisIsANewReferralLetterForAPatient.pdf’ then there would also 

2914 need to be a meta file in the same folder called 

2915 ‘ThisIsANewReferralLetterForAPatient.metadata’. The contents of 

2916 the meta file would need to include the following: 

2917 

2918 Field Order; Field Name; Description; Data Mandatory (Y/N); 

2919 Format 

2920 

2921 1; ClientID; RiO Client ID which identifies the patient in RiO 

2922 against which the document will be uploaded.; Y; 15 

2923 Alphanumeric Characters 

2924 

2925 2; UserID; User ID of the uploaded document, this is any user 

2926 defined within the RiO system and can be a single system user 

2927 called ‘AutomaticDocumentUploadUser’ for example.; Y; 10 

2928 Alphanumeric Characters 

2929 

2930 [NB example longer than that!] 

2931 

2932 3; DocumentType; The RiO defined document type eg: APT; Y; 80 

2933 Alphanumeric Characters 

2934 

2935 4; Title; The title of the document; N; 40 Alphanumeric 

2936 Characters 

2937 

2938 5; Description; The document description.; N; 500 Alphanumeric 

2939 Characters 

2940 

2941 6; Author; The author of the document; N; 80 Alphanumeric 

2942 Characters 

2943 

2944 7; DocumentDate; The date of the document; N; dd/MM/yyyy HH:mm 

2945 

2946 8; FinalRevision; The revision values are 0 Draft or 1 Final, 

2947 this is defaulted to 1 which is Final revision.; N; 0 or 1 

2948 

2949 As an example, this is what would be needed in a meta file: 

2950 

2951 “1000001”,”TRUST1”,”APT”,”A title”, “A description of the 

2952 document”, “An author”,”01/12/2012 09:45”,”1” 

2953 

2954 (on one line) 

2955 

2956 Clarification, from Lee Meredith to Rudolf Cardinal, 2015-02-18: 

2957 

2958 - metadata files must be plain ASCII, not UTF-8 

2959 

2960 - ... here and 

2961 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup.export_task` 

2962 

2963 - line terminator is <CR> 

2964 

2965 - BUT see 

2966 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup.export_task` 

2967 

2968 - user name limit is 10 characters, despite incorrect example 

2969 

2970 - search for ``RIO_MAX_USER_LEN`` 

2971 

2972 - DocumentType is a code that maps to a human-readable document 

2973 type; for example, "APT" might map to "Appointment Letter". These 

2974 mappings are specific to the local system. (We will probably want 

2975 one that maps to "Clinical Correspondence" in the absence of 

2976 anything more specific.) 

2977 

2978 - RiO will delete the files after it's processed them. 

2979 

2980 - Filenames should avoid spaces, but otherwise any other standard 

2981 ASCII code is fine within filenames. 

2982 

2983 - see 

2984 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup.export_task` 

2985 

2986 """ 

2987 

2988 try: 

2989 client_id = str(self.patient.get_idnum_value(which_idnum)) 

2990 except AttributeError: 

2991 client_id = "" 

2992 title = "CamCOPS_" + self.shortname 

2993 description = self.longname(req) 

2994 author = self.get_clinician_name() # may be blank 

2995 document_date = format_datetime( 

2996 self.when_created, DateFormat.RIO_EXPORT_UK 

2997 ) 

2998 # This STRIPS the timezone information; i.e. it is in the local 

2999 # timezone but doesn't tell you which timezone that is. (That's fine; 

3000 # it should be local or users would be confused.) 

3001 final_revision = 0 if self.is_live_on_tablet() else 1 

3002 

3003 item_list = [ 

3004 client_id, 

3005 uploading_user_id, 

3006 document_type, 

3007 title, 

3008 description, 

3009 author, 

3010 document_date, 

3011 final_revision, 

3012 ] 

3013 # UTF-8 is NOT supported by RiO for metadata. So: 

3014 csv_line = ",".join( 

3015 [f'"{mangle_unicode_to_ascii(x)}"' for x in item_list] 

3016 ) 

3017 return csv_line + "\n" 

3018 

3019 # ------------------------------------------------------------------------- 

3020 # HTML elements used by tasks 

3021 # ------------------------------------------------------------------------- 

3022 

3023 # noinspection PyMethodMayBeStatic 

3024 def get_standard_clinician_comments_block( 

3025 self, req: "CamcopsRequest", comments: str 

3026 ) -> str: 

3027 """ 

3028 HTML DIV for clinician's comments. 

3029 """ 

3030 return render( 

3031 "clinician_comments.mako", dict(comment=comments), request=req 

3032 ) 

3033 

3034 def get_is_complete_td_pair(self, req: "CamcopsRequest") -> str: 

3035 """ 

3036 HTML to indicate whether task is complete or not, and to make it 

3037 very obvious visually when it isn't. 

3038 """ 

3039 c = self.is_complete() 

3040 td_class = "" if c else f' class="{CssClass.INCOMPLETE}"' 

3041 return ( 

3042 f"<td>Completed?</td>" 

3043 f"<td{td_class}><b>{get_yes_no(req, c)}</b></td>" 

3044 ) 

3045 

3046 def get_is_complete_tr(self, req: "CamcopsRequest") -> str: 

3047 """ 

3048 HTML table row to indicate whether task is complete or not, and to 

3049 make it very obvious visually when it isn't. 

3050 """ 

3051 return f"<tr>{self.get_is_complete_td_pair(req)}</tr>" 

3052 

3053 def get_twocol_val_row( 

3054 self, fieldname: str, default: str = None, label: str = None 

3055 ) -> str: 

3056 """ 

3057 HTML table row, two columns, without web-safing of value. 

3058 

3059 Args: 

3060 fieldname: field (attribute) name; the value will be retrieved 

3061 from this attribute 

3062 default: default to show if the value is ``None`` 

3063 label: descriptive label 

3064 

3065 Returns: 

3066 two-column HTML table row (label, value) 

3067 

3068 """ 

3069 val = getattr(self, fieldname) 

3070 if val is None: 

3071 val = default 

3072 if label is None: 

3073 label = fieldname 

3074 return tr_qa(label, val) 

3075 

3076 def get_twocol_string_row(self, fieldname: str, label: str = None) -> str: 

3077 """ 

3078 HTML table row, two columns, with web-safing of value. 

3079 

3080 Args: 

3081 fieldname: field (attribute) name; the value will be retrieved 

3082 from this attribute 

3083 label: descriptive label 

3084 

3085 Returns: 

3086 two-column HTML table row (label, value) 

3087 """ 

3088 if label is None: 

3089 label = fieldname 

3090 return tr_qa(label, getattr(self, fieldname)) 

3091 

3092 def get_twocol_bool_row( 

3093 self, req: "CamcopsRequest", fieldname: str, label: str = None 

3094 ) -> str: 

3095 """ 

3096 HTML table row, two columns, with Boolean Y/N formatter for value. 

3097 

3098 Args: 

3099 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

3100 fieldname: field (attribute) name; the value will be retrieved 

3101 from this attribute 

3102 label: descriptive label 

3103 

3104 Returns: 

3105 two-column HTML table row (label, value) 

3106 """ 

3107 if label is None: 

3108 label = fieldname 

3109 return tr_qa(label, get_yes_no_none(req, getattr(self, fieldname))) 

3110 

3111 def get_twocol_bool_row_true_false( 

3112 self, req: "CamcopsRequest", fieldname: str, label: str = None 

3113 ) -> str: 

3114 """ 

3115 HTML table row, two columns, with Boolean true/false formatter for 

3116 value. 

3117 

3118 Args: 

3119 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

3120 fieldname: field (attribute) name; the value will be retrieved 

3121 from this attribute 

3122 label: descriptive label 

3123 

3124 Returns: 

3125 two-column HTML table row (label, value) 

3126 """ 

3127 if label is None: 

3128 label = fieldname 

3129 return tr_qa(label, get_true_false_none(req, getattr(self, fieldname))) 

3130 

3131 def get_twocol_bool_row_present_absent( 

3132 self, req: "CamcopsRequest", fieldname: str, label: str = None 

3133 ) -> str: 

3134 """ 

3135 HTML table row, two columns, with Boolean present/absent formatter for 

3136 value. 

3137 

3138 Args: 

3139 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

3140 fieldname: field (attribute) name; the value will be retrieved 

3141 from this attribute 

3142 label: descriptive label 

3143 

3144 Returns: 

3145 two-column HTML table row (label, value) 

3146 """ 

3147 if label is None: 

3148 label = fieldname 

3149 return tr_qa( 

3150 label, get_present_absent_none(req, getattr(self, fieldname)) 

3151 ) 

3152 

3153 @staticmethod 

3154 def get_twocol_picture_row(blob: Optional[Blob], label: str) -> str: 

3155 """ 

3156 HTML table row, two columns, with PNG on right. 

3157 

3158 Args: 

3159 blob: the :class:`camcops_server.cc_modules.cc_blob.Blob` object 

3160 label: descriptive label 

3161 

3162 Returns: 

3163 two-column HTML table row (label, picture) 

3164 """ 

3165 return tr(label, get_blob_img_html(blob)) 

3166 

3167 # ------------------------------------------------------------------------- 

3168 # Field helper functions for subclasses 

3169 # ------------------------------------------------------------------------- 

3170 

3171 def get_values(self, fields: List[str]) -> List: 

3172 """ 

3173 Get list of object's values from list of field names. 

3174 """ 

3175 return [getattr(self, f) for f in fields] 

3176 

3177 def is_field_not_none(self, field: str) -> bool: 

3178 """ 

3179 Is the field not None? 

3180 """ 

3181 return getattr(self, field) is not None 

3182 

3183 def any_fields_none(self, fields: List[str]) -> bool: 

3184 """ 

3185 Are any specified fields None? 

3186 """ 

3187 for f in fields: 

3188 if getattr(self, f) is None: 

3189 return True 

3190 return False 

3191 

3192 def all_fields_not_none(self, fields: List[str]) -> bool: 

3193 """ 

3194 Are all specified fields not None? 

3195 """ 

3196 return not self.any_fields_none(fields) 

3197 

3198 def any_fields_null_or_empty_str(self, fields: List[str]) -> bool: 

3199 """ 

3200 Are any specified fields either None or the empty string? 

3201 """ 

3202 for f in fields: 

3203 v = getattr(self, f) 

3204 if v is None or v == "": 

3205 return True 

3206 return False 

3207 

3208 def are_all_fields_not_null_or_empty_str(self, fields: List[str]) -> bool: 

3209 """ 

3210 Are all specified fields neither None nor the empty string? 

3211 """ 

3212 return not self.any_fields_null_or_empty_str(fields) 

3213 

3214 def n_fields_not_none(self, fields: List[str]) -> int: 

3215 """ 

3216 How many of the specified fields are not None? 

3217 """ 

3218 total = 0 

3219 for f in fields: 

3220 if getattr(self, f) is not None: 

3221 total += 1 

3222 return total 

3223 

3224 def n_fields_none(self, fields: List[str]) -> int: 

3225 """ 

3226 How many of the specified fields are None? 

3227 """ 

3228 total = 0 

3229 for f in fields: 

3230 if getattr(self, f) is None: 

3231 total += 1 

3232 return total 

3233 

3234 def count_booleans(self, fields: List[str]) -> int: 

3235 """ 

3236 How many of the specified fields evaluate to True (are truthy)? 

3237 """ 

3238 total = 0 

3239 for f in fields: 

3240 value = getattr(self, f) 

3241 if value: 

3242 total += 1 

3243 return total 

3244 

3245 def all_truthy(self, fields: List[str]) -> bool: 

3246 """ 

3247 Do all the specified fields evaluate to True (are they all truthy)? 

3248 """ 

3249 for f in fields: 

3250 value = getattr(self, f) 

3251 if not value: 

3252 return False 

3253 return True 

3254 

3255 def count_where(self, fields: List[str], wherevalues: List[Any]) -> int: 

3256 """ 

3257 Count how many values for the specified fields are in ``wherevalues``. 

3258 """ 

3259 return sum(1 for x in self.get_values(fields) if x in wherevalues) 

3260 

3261 def count_wherenot(self, fields: List[str], notvalues: List[Any]) -> int: 

3262 """ 

3263 Count how many values for the specified fields are NOT in 

3264 ``notvalues``. 

3265 """ 

3266 return sum(1 for x in self.get_values(fields) if x not in notvalues) 

3267 

3268 @staticmethod 

3269 def sum_values( 

3270 values: List[Union[int, float, None]], 

3271 ignorevalues: List[Union[int, float, None]] = None, 

3272 ) -> Union[int, float, None]: 

3273 """ 

3274 Sum the values provided (skipping any whose value is 

3275 in ``ignorevalues``, which defaults to [None]). Returns None on error. 

3276 """ 

3277 if ignorevalues is None: # don't bool-test it; [] is valid 

3278 ignorevalues = [None] 

3279 filtered_values = [v for v in values if v not in ignorevalues] 

3280 try: 

3281 return sum(filtered_values) 

3282 except TypeError: 

3283 return None 

3284 

3285 def sum_fields( 

3286 self, 

3287 fields: List[str], 

3288 ignorevalues: List[Union[int, float, None]] = None, 

3289 ) -> Union[int, float, None]: 

3290 """ 

3291 Sum values stored in all specified fields (skipping any whose value is 

3292 in ``ignorevalues``, which defaults to [None]). Returns None on error. 

3293 """ 

3294 values = [getattr(self, f) for f in fields] 

3295 return self.sum_values(values, ignorevalues=ignorevalues) 

3296 

3297 @staticmethod 

3298 def mean_values( 

3299 values: List[Any], ignorevalues: List[Union[int, float, None]] = None 

3300 ) -> Union[int, float, None]: 

3301 """ 

3302 Return the mean of the values provided (skipping any whose value is 

3303 in ``ignorevalues``, which defaults to [None]). Returns None on error. 

3304 """ 

3305 if ignorevalues is None: # don't bool-test it; [] is valid 

3306 ignorevalues = [None] 

3307 filtered_values = [v for v in values if v not in ignorevalues] 

3308 try: 

3309 return statistics.mean(filtered_values) 

3310 except (TypeError, statistics.StatisticsError): 

3311 return None 

3312 

3313 def mean_fields( 

3314 self, 

3315 fields: List[str], 

3316 ignorevalues: List[Union[int, float, None]] = None, 

3317 ) -> Union[int, float, None]: 

3318 """ 

3319 Return the mean of the values stored in all specified fields (skipping 

3320 any whose value is in ``ignorevalues``, which defaults to [None]). 

3321 Returns None on error. 

3322 """ 

3323 raw_values = [getattr(self, f) for f in fields] 

3324 return self.mean_values(raw_values, ignorevalues=ignorevalues) 

3325 

3326 @staticmethod 

3327 def fieldnames_from_prefix(prefix: str, start: int, end: int) -> List[str]: 

3328 """ 

3329 Returns a list of field (column, attribute) names from a prefix. 

3330 For example, ``fieldnames_from_prefix("q", 1, 5)`` produces 

3331 ``["q1", "q2", "q3", "q4", "q5"]``. 

3332 

3333 Args: 

3334 prefix: string prefix 

3335 start: first value (inclusive) 

3336 end: last value (inclusive 

3337 

3338 Returns: 

3339 list of fieldnames, as above 

3340 

3341 """ 

3342 return [prefix + str(x) for x in range(start, end + 1)] 

3343 

3344 @staticmethod 

3345 def fieldnames_from_list( 

3346 prefix: str, suffixes: Iterable[Any] 

3347 ) -> List[str]: 

3348 """ 

3349 Returns a list of fieldnames made by appending each suffix to the 

3350 prefix. 

3351 

3352 Args: 

3353 prefix: string prefix 

3354 suffixes: list of suffixes, which will be coerced to ``str`` 

3355 

3356 Returns: 

3357 list of fieldnames, as above 

3358 

3359 """ 

3360 return [prefix + str(x) for x in suffixes] 

3361 

3362 # ------------------------------------------------------------------------- 

3363 # Extra strings 

3364 # ------------------------------------------------------------------------- 

3365 

3366 @classmethod 

3367 def get_extrastring_taskname(cls) -> str: 

3368 """ 

3369 Get the taskname used as the top-level key for this task's extra 

3370 strings (loaded by the server from XML files). By default, this is the 

3371 task's primary tablename, but tasks may override that via 

3372 ``extrastring_taskname``. 

3373 """ 

3374 return cls.extrastring_taskname or cls.tablename 

3375 

3376 @classmethod 

3377 def extrastrings_exist(cls, req: "CamcopsRequest") -> bool: 

3378 """ 

3379 Does the server have any extra strings for this task? 

3380 """ 

3381 return req.task_extrastrings_exist(cls.get_extrastring_taskname()) 

3382 

3383 @classmethod 

3384 def wxstring( 

3385 cls, 

3386 req: "CamcopsRequest", 

3387 name: str, 

3388 defaultvalue: str = None, 

3389 provide_default_if_none: bool = True, 

3390 ) -> str: 

3391 """ 

3392 Return a web-safe version of an extra string for this task. 

3393 

3394 Args: 

3395 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

3396 name: name (second-level key) of the string, within the set of 

3397 this task's extra strings 

3398 defaultvalue: default to return if the string is not found 

3399 provide_default_if_none: if ``True`` and ``default is None``, 

3400 return a helpful missing-string message in the style 

3401 "string x.y not found" 

3402 """ 

3403 if defaultvalue is None and provide_default_if_none: 

3404 defaultvalue = f"[{cls.get_extrastring_taskname()}: {name}]" 

3405 return req.wxstring( 

3406 cls.get_extrastring_taskname(), 

3407 name, 

3408 defaultvalue, 

3409 provide_default_if_none=provide_default_if_none, 

3410 ) 

3411 

3412 @classmethod 

3413 def xstring( 

3414 cls, 

3415 req: "CamcopsRequest", 

3416 name: str, 

3417 defaultvalue: str = None, 

3418 provide_default_if_none: bool = True, 

3419 ) -> str: 

3420 """ 

3421 Return a raw (not necessarily web-safe) version of an extra string for 

3422 this task. 

3423 

3424 Args: 

3425 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

3426 name: name (second-level key) of the string, within the set of 

3427 this task's extra strings 

3428 defaultvalue: default to return if the string is not found 

3429 provide_default_if_none: if ``True`` and ``default is None``, 

3430 return a helpful missing-string message in the style 

3431 "string x.y not found" 

3432 """ 

3433 if defaultvalue is None and provide_default_if_none: 

3434 defaultvalue = f"[{cls.get_extrastring_taskname()}: {name}]" 

3435 return req.xstring( 

3436 cls.get_extrastring_taskname(), 

3437 name, 

3438 defaultvalue, 

3439 provide_default_if_none=provide_default_if_none, 

3440 ) 

3441 

3442 @classmethod 

3443 def make_options_from_xstrings( 

3444 cls, 

3445 req: "CamcopsRequest", 

3446 prefix: str, 

3447 first: int, 

3448 last: int, 

3449 suffix: str = "", 

3450 ) -> Dict[int, str]: 

3451 """ 

3452 Creates a lookup dictionary from xstrings. 

3453 

3454 Args: 

3455 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

3456 prefix: prefix for xstring 

3457 first: first value 

3458 last: last value 

3459 suffix: optional suffix 

3460 

3461 Returns: 

3462 dict: Each entry maps ``value`` to an xstring named 

3463 ``<PREFIX><VALUE><SUFFIX>``. 

3464 

3465 """ 

3466 d = {} # type: Dict[int, str] 

3467 if first > last: # descending order 

3468 for i in range(first, last - 1, -1): 

3469 d[i] = cls.xstring(req, f"{prefix}{i}{suffix}") 

3470 else: # ascending order 

3471 for i in range(first, last + 1): 

3472 d[i] = cls.xstring(req, f"{prefix}{i}{suffix}") 

3473 return d 

3474 

3475 @staticmethod 

3476 def make_options_from_numbers(first: int, last: int) -> Dict[int, str]: 

3477 """ 

3478 Creates a simple dictionary mapping numbers to string versions of those 

3479 numbers. Usually for subsequent (more interesting) processing! 

3480 

3481 Args: 

3482 first: first value 

3483 last: last value 

3484 

3485 Returns: 

3486 dict 

3487 

3488 """ 

3489 d = {} # type: Dict[int, str] 

3490 if first > last: # descending order 

3491 for i in range(first, last - 1, -1): 

3492 d[i] = str(i) 

3493 else: # ascending order 

3494 for i in range(first, last + 1): 

3495 d[i] = str(i) 

3496 return d 

3497 

3498 

3499# ============================================================================= 

3500# Collating all task tables for specific purposes 

3501# ============================================================================= 

3502# Function, staticmethod, classmethod? 

3503# https://stackoverflow.com/questions/8108688/in-python-when-should-i-use-a-function-instead-of-a-method # noqa 

3504# https://stackoverflow.com/questions/11788195/module-function-vs-staticmethod-vs-classmethod-vs-no-decorators-which-idiom-is # noqa 

3505# https://stackoverflow.com/questions/15017734/using-static-methods-in-python-best-practice # noqa 

3506 

3507 

3508def all_task_tables_with_min_client_version() -> Dict[str, Version]: 

3509 """ 

3510 Across all tasks, return a mapping from each of their tables to the 

3511 minimum client version. 

3512 

3513 Used by 

3514 :func:`camcops_server.cc_modules.client_api.all_tables_with_min_client_version`. 

3515 

3516 """ 

3517 d = {} # type: Dict[str, Version] 

3518 classes = list(Task.gen_all_subclasses()) 

3519 for cls in classes: 

3520 d.update(cls.all_tables_with_min_client_version()) # type: ignore[attr-defined] # noqa: E501 

3521 return d 

3522 

3523 

3524@cache_region_static.cache_on_arguments(function_key_generator=fkg) 

3525def tablename_to_task_class_dict() -> Dict[str, Type[Task]]: 

3526 """ 

3527 Returns a mapping from task base tablenames to task classes. 

3528 """ 

3529 d = {} # type: Dict[str, Type[Task]] 

3530 for cls in Task.gen_all_subclasses(): 

3531 d[cls.tablename] = cls # type: ignore[attr-defined] 

3532 return d 

3533 

3534 

3535@cache_region_static.cache_on_arguments(function_key_generator=fkg) 

3536def all_task_tablenames() -> List[str]: 

3537 """ 

3538 Returns all task base table names. 

3539 """ 

3540 d = tablename_to_task_class_dict() 

3541 return list(d.keys()) 

3542 

3543 

3544@cache_region_static.cache_on_arguments(function_key_generator=fkg) 

3545def all_task_classes() -> List[Type[Task]]: 

3546 """ 

3547 Returns all task base table names. 

3548 """ 

3549 d = tablename_to_task_class_dict() 

3550 return list(d.values()) 

3551 

3552 

3553# ============================================================================= 

3554# Support functions 

3555# ============================================================================= 

3556 

3557 

3558def get_from_dict(d: Dict, key: Any, default: Any = INVALID_VALUE) -> Any: 

3559 """ 

3560 Returns a value from a dictionary. This is not a very complex function... 

3561 all it really does in practice is provide a default for ``default``. 

3562 

3563 Args: 

3564 d: the dictionary 

3565 key: the key 

3566 default: value to return if none is provided 

3567 """ 

3568 return d.get(key, default)