Coverage for cc_modules/cc_exportmodels.py: 33%

464 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/cc_modules/cc_exportmodels.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Define models for export functions (e.g. HL7, file-based export).** 

27 

28""" 

29 

30import datetime 

31import logging 

32import os 

33import posixpath 

34import socket 

35import subprocess 

36import sys 

37from typing import Any, Generator, List, Optional, Tuple, TYPE_CHECKING 

38 

39from cardinal_pythonlib.datetimefunc import ( 

40 get_now_utc_datetime, 

41 get_now_utc_pendulum, 

42) 

43from cardinal_pythonlib.email.sendmail import ( 

44 CONTENT_TYPE_HTML, 

45 CONTENT_TYPE_TEXT, 

46) 

47from cardinal_pythonlib.fileops import mkdir_p 

48from cardinal_pythonlib.logs import BraceStyleAdapter 

49from cardinal_pythonlib.network import ping 

50from cardinal_pythonlib.sqlalchemy.list_types import StringListType 

51from cardinal_pythonlib.sqlalchemy.orm_query import bool_from_exists_clause 

52import hl7 

53from pendulum import DateTime as Pendulum 

54from sqlalchemy.orm import ( 

55 Mapped, 

56 mapped_column, 

57 reconstructor, 

58 relationship, 

59 Session as SqlASession, 

60) 

61from sqlalchemy.sql.schema import ForeignKey 

62from sqlalchemy.sql.sqltypes import ( 

63 BigInteger, 

64 Text, 

65 UnicodeText, 

66) 

67 

68from camcops_server.cc_modules.cc_constants import ( 

69 ConfigParamExportRecipient, 

70 FileType, 

71 UTF8, 

72) 

73from camcops_server.cc_modules.cc_email import Email 

74from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

75from camcops_server.cc_modules.cc_exportrecipientinfo import ( 

76 ExportTransmissionMethod, 

77) 

78from camcops_server.cc_modules.cc_fhir import ( 

79 FhirExportException, 

80 FhirTaskExporter, 

81) 

82from camcops_server.cc_modules.cc_filename import change_filename_ext 

83from camcops_server.cc_modules.cc_hl7 import ( 

84 make_msh_segment, 

85 MLLPTimeoutClient, 

86 msg_is_successful_ack, 

87 SEGMENT_SEPARATOR, 

88) 

89from camcops_server.cc_modules.cc_redcap import ( 

90 RedcapExportException, 

91 RedcapTaskExporter, 

92) 

93from camcops_server.cc_modules.cc_sqla_coltypes import ( 

94 LongText, 

95 TableNameColType, 

96) 

97from camcops_server.cc_modules.cc_sqlalchemy import Base 

98from camcops_server.cc_modules.cc_taskcollection import ( 

99 TaskCollection, 

100 TaskSortMethod, 

101) 

102from camcops_server.cc_modules.cc_taskfactory import ( 

103 task_factory_no_security_checks, 

104) 

105 

106if TYPE_CHECKING: 

107 from camcops_server.cc_modules.cc_request import CamcopsRequest 

108 from camcops_server.cc_modules.cc_task import Task 

109 

110log = BraceStyleAdapter(logging.getLogger(__name__)) 

111 

112 

113# ============================================================================= 

114# Constants 

115# ============================================================================= 

116 

117DOS_NEWLINE = "\r\n" 

118 

119 

120# ============================================================================= 

121# Create task collections for export 

122# ============================================================================= 

123 

124 

125def get_collection_for_export( 

126 req: "CamcopsRequest", 

127 recipient: ExportRecipient, 

128 via_index: bool = True, 

129 debug: bool = False, 

130) -> TaskCollection: 

131 """ 

132 Returns an appropriate task collection for this export recipient, namely 

133 those tasks that are desired and (in the case of incremental exports) 

134 haven't already been sent. 

135 

136 "Not already sent" means "not already sent to an export recipient with 

137 the same name (even if other aspects of the export recipient have 

138 changed)". 

139 

140 Args: 

141 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

142 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

143 via_index: use the task index (faster)? 

144 debug: report details? 

145 

146 Returns: 

147 a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

148 """ # noqa 

149 if not via_index: 

150 log.debug("Task index disabled for get_collection_for_export()") 

151 collection = TaskCollection( 

152 req=req, 

153 sort_method_by_class=TaskSortMethod.CREATION_DATE_ASC, 

154 current_only=True, 

155 via_index=via_index, 

156 export_recipient=recipient, 

157 ) 

158 if debug: 

159 log.debug( 

160 "get_collection_for_export(): recipient={!r}, " "collection={!r}", 

161 recipient, 

162 collection, 

163 ) 

164 return collection 

165 

166 

167def gen_exportedtasks( 

168 collection: TaskCollection, 

169) -> Generator["ExportedTask", None, None]: 

170 """ 

171 Generates task export entries from a collection. 

172 

173 Args: 

174 collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

175 

176 Yields: 

177 :class:`ExportedTask` objects 

178 

179 """ # noqa 

180 dbsession = collection.dbsession 

181 recipient = collection.export_recipient 

182 assert recipient is not None, "TaskCollection has no export_recipient" 

183 for task in collection.gen_tasks_by_class(): 

184 et = ExportedTask(recipient, task) 

185 dbsession.add(et) 

186 yield et 

187 

188 

189def gen_tasks_having_exportedtasks( 

190 collection: TaskCollection, 

191) -> Generator["Task", None, None]: 

192 """ 

193 Generates tasks from a collection, creating export logs as we go. 

194 

195 Used for database exports. 

196 

197 Args: 

198 collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

199 

200 Yields: 

201 :class:`camcops_server.cc_modules.cc_task.Task` objects 

202 

203 """ # noqa 

204 for et in gen_exportedtasks(collection): 

205 yield et.task 

206 et.succeed() 

207 

208 

209# ============================================================================= 

210# ExportedTask class 

211# ============================================================================= 

212 

213 

214class ExportedTask(Base): 

215 """ 

216 Class representing an attempt to exported a task (as part of a 

217 :class:`ExportRun`) to a specific 

218 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`. 

219 """ 

220 

221 __tablename__ = "_exported_tasks" 

222 

223 id: Mapped[int] = mapped_column( 

224 BigInteger, 

225 primary_key=True, 

226 autoincrement=True, 

227 comment="Arbitrary primary key", 

228 ) 

229 recipient_id: Mapped[int] = mapped_column( 

230 BigInteger, 

231 ForeignKey(ExportRecipient.id), 

232 comment=f"FK to {ExportRecipient.__tablename__}.{ExportRecipient.id.name}", # noqa 

233 ) 

234 basetable: Mapped[str] = mapped_column( 

235 TableNameColType, 

236 index=True, 

237 comment="Base table of task concerned", 

238 ) 

239 task_server_pk: Mapped[int] = mapped_column( 

240 index=True, 

241 comment="Server PK of task in basetable (_pk field)", 

242 ) 

243 start_at_utc: Mapped[datetime.datetime] = mapped_column( 

244 index=True, 

245 comment="Time export was started (UTC)", 

246 ) 

247 finish_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column( 

248 comment="Time export was finished (UTC)" 

249 ) 

250 success: Mapped[bool] = mapped_column( 

251 default=False, 

252 comment="Task exported successfully?", 

253 ) 

254 failure_reasons: Mapped[Optional[List[str]]] = mapped_column( 

255 "failure_reasons", StringListType, comment="Reasons for failure" 

256 ) 

257 cancelled: Mapped[bool] = mapped_column( 

258 default=False, 

259 comment="Export subsequently cancelled/invalidated (may trigger resend)", # noqa 

260 ) 

261 cancelled_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column( 

262 comment="Time export was cancelled at (UTC)", 

263 ) 

264 

265 recipient = relationship(ExportRecipient) 

266 

267 emails = relationship("ExportedTaskEmail", back_populates="exported_task") 

268 fhir_exports = relationship( 

269 "ExportedTaskFhir", back_populates="exported_task" 

270 ) 

271 filegroups = relationship( 

272 "ExportedTaskFileGroup", back_populates="exported_task" 

273 ) 

274 hl7_messages = relationship( 

275 "ExportedTaskHL7Message", back_populates="exported_task" 

276 ) 

277 redcap_exports = relationship( 

278 "ExportedTaskRedcap", back_populates="exported_task" 

279 ) 

280 

281 def __init__( 

282 self, 

283 recipient: ExportRecipient = None, 

284 task: "Task" = None, 

285 basetable: str = None, 

286 task_server_pk: int = None, 

287 **kwargs: Any, 

288 ) -> None: 

289 """ 

290 Can initialize with a task, or a basetable/task_server_pk combination. 

291 

292 Args: 

293 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

294 task: a :class:`camcops_server.cc_modules.cc_task.Task` object 

295 basetable: base table name of the task 

296 task_server_pk: server PK of the task 

297 

298 (However, we must also support a no-parameter constructor, not least 

299 for our :func:`merge_db` function.) 

300 """ # noqa 

301 super().__init__(**kwargs) 

302 self.recipient = recipient 

303 self.start_at_utc = get_now_utc_datetime() 

304 if task: 

305 assert ( 

306 not basetable 

307 ) and task_server_pk is None, ( 

308 "Task specified; mustn't specify basetable/task_server_pk" 

309 ) 

310 self.basetable = task.tablename 

311 self.task_server_pk = task.pk 

312 self._task = task 

313 else: 

314 self.basetable = basetable 

315 self.task_server_pk = task_server_pk 

316 self._task = None 

317 

318 @reconstructor 

319 def init_on_load(self) -> None: 

320 """ 

321 Called when SQLAlchemy recreates an object; see 

322 https://docs.sqlalchemy.org/en/latest/orm/constructors.html. 

323 """ 

324 self._task = None 

325 

326 @property 

327 def task(self) -> "Task": 

328 """ 

329 Returns the associated task. 

330 """ 

331 if self._task is None: 

332 dbsession = SqlASession.object_session(self) 

333 try: 

334 self._task = task_factory_no_security_checks( 

335 dbsession, self.basetable, self.task_server_pk 

336 ) 

337 except KeyError: 

338 log.warning( 

339 "Failed to retrieve task for basetable={!r}, " "PK={!r}", 

340 self.basetable, 

341 self.task_server_pk, 

342 ) 

343 self._task = None 

344 return self._task 

345 

346 def succeed(self) -> None: 

347 """ 

348 Register success. 

349 """ 

350 self.success = True 

351 self.finish() 

352 

353 def abort(self, msg: str) -> None: 

354 """ 

355 Record failure, and why. 

356 

357 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to 

358 functions named ``fail``: 

359 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) 

360 

361 Args: 

362 msg: why 

363 """ 

364 self.success = False 

365 log.error("Task export failed: {}", msg) 

366 self._add_failure_reason(msg) 

367 self.finish() 

368 

369 def _add_failure_reason(self, msg: str) -> None: 

370 """ 

371 Writes to our ``failure_reasons`` list in a way that (a) obviates the 

372 need to create an empty list via ``__init__()``, and (b) will 

373 definitely mark it as dirty, so it gets saved to the database. 

374 

375 See :class:`cardinal_pythonlib.sqlalchemy.list_types.StringListType`. 

376 

377 Args: 

378 msg: the message 

379 """ 

380 if self.failure_reasons is None: 

381 self.failure_reasons = [msg] 

382 else: 

383 # Do not use .append(); that won't mark the record as dirty. 

384 # Don't use "+="; similarly, that calls list.__iadd__(), not 

385 # InstrumentedAttribute.__set__(). 

386 # noinspection PyAugmentAssignment 

387 self.failure_reasons = self.failure_reasons + [msg] 

388 

389 def finish(self) -> None: 

390 """ 

391 Records the finish time. 

392 """ 

393 self.finish_at_utc = get_now_utc_datetime() 

394 

395 def export(self, req: "CamcopsRequest") -> None: 

396 """ 

397 Performs an export of the specific task. 

398 

399 Args: 

400 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

401 """ 

402 dbsession = req.dbsession 

403 recipient = self.recipient 

404 transmission_method = recipient.transmission_method 

405 log.info("Exporting task {!r} to recipient {}", self.task, recipient) 

406 

407 if transmission_method == ExportTransmissionMethod.EMAIL: 

408 email = ExportedTaskEmail(self) 

409 dbsession.add(email) 

410 email.export_task(req) 

411 

412 elif transmission_method == ExportTransmissionMethod.FHIR: 

413 efhir = ExportedTaskFhir(self) 

414 dbsession.add(efhir) 

415 dbsession.flush() 

416 efhir.export_task(req) 

417 

418 elif transmission_method == ExportTransmissionMethod.FILE: 

419 efg = ExportedTaskFileGroup(self) 

420 dbsession.add(efg) 

421 efg.export_task(req) 

422 

423 elif transmission_method == ExportTransmissionMethod.HL7: 

424 ehl7 = ExportedTaskHL7Message(self) 

425 if ehl7.valid(): 

426 dbsession.add(ehl7) 

427 ehl7.export_task(req) 

428 else: 

429 self.abort("Task not valid for HL7 export") 

430 

431 elif transmission_method == ExportTransmissionMethod.REDCAP: 

432 eredcap = ExportedTaskRedcap(self) 

433 dbsession.add(eredcap) 

434 eredcap.export_task(req) 

435 

436 else: 

437 raise AssertionError("Bug: bad transmission_method") 

438 

439 @property 

440 def filegroup(self) -> "ExportedTaskFileGroup": 

441 """ 

442 Returns a :class:`ExportedTaskFileGroup`, creating it if necessary. 

443 """ 

444 if self.filegroups: 

445 # noinspection PyUnresolvedReferences 

446 filegroup = self.filegroups[0] # type: ExportedTaskFileGroup 

447 else: 

448 filegroup = ExportedTaskFileGroup(self) 

449 # noinspection PyUnresolvedReferences 

450 self.filegroups.append(filegroup) 

451 return filegroup 

452 

453 def export_file( 

454 self, 

455 filename: str, 

456 text: str = None, 

457 binary: bytes = None, 

458 text_encoding: str = UTF8, 

459 ) -> bool: 

460 """ 

461 Exports a file. 

462 

463 Args: 

464 filename: 

465 text: text contents (specify this XOR ``binary``) 

466 binary: binary contents (specify this XOR ``text``) 

467 text_encoding: encoding to use when writing text 

468 

469 Returns: was it exported? 

470 """ 

471 filegroup = self.filegroup 

472 return filegroup.export_file( 

473 filename=filename, 

474 text=text, 

475 binary=binary, 

476 text_encoding=text_encoding, 

477 ) 

478 

479 def cancel(self) -> None: 

480 """ 

481 Marks the task export as cancelled/invalidated. 

482 

483 May trigger a resend (which is the point). 

484 """ 

485 self.cancelled = True 

486 self.cancelled_at_utc = get_now_utc_datetime() 

487 

488 @classmethod 

489 def task_already_exported( 

490 cls, 

491 dbsession: SqlASession, 

492 recipient_name: str, 

493 basetable: str, 

494 task_pk: int, 

495 ) -> bool: 

496 """ 

497 Has the specified task already been successfully exported? 

498 

499 Args: 

500 dbsession: a :class:`sqlalchemy.orm.session.Session` 

501 recipient_name: 

502 basetable: name of the task's base table 

503 task_pk: server PK of the task 

504 

505 Returns: 

506 does a successful export record exist for this task? 

507 

508 """ 

509 exists_q = ( 

510 dbsession.query(cls) 

511 .join(cls.recipient) 

512 .filter(ExportRecipient.recipient_name == recipient_name) 

513 .filter(cls.basetable == basetable) 

514 .filter(cls.task_server_pk == task_pk) 

515 .filter(cls.success == True) # noqa: E712 

516 .filter(cls.cancelled == False) # noqa: E712 

517 .exists() 

518 ) 

519 return bool_from_exists_clause(dbsession, exists_q) 

520 

521 

522# ============================================================================= 

523# HL7 export 

524# ============================================================================= 

525 

526 

527class ExportedTaskHL7Message(Base): 

528 """ 

529 Represents an individual HL7 message. 

530 """ 

531 

532 __tablename__ = "_exported_task_hl7msg" 

533 

534 id: Mapped[int] = mapped_column( 

535 BigInteger, 

536 primary_key=True, 

537 autoincrement=True, 

538 comment="Arbitrary primary key", 

539 ) 

540 exported_task_id: Mapped[int] = mapped_column( 

541 BigInteger, 

542 ForeignKey(ExportedTask.id), 

543 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", 

544 ) 

545 sent_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column( 

546 comment="Time message was sent at (UTC)" 

547 ) 

548 reply_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column( 

549 comment="Time message was replied to (UTC)" 

550 ) 

551 success: Mapped[Optional[bool]] = mapped_column( 

552 comment="Message sent successfully and acknowledged by HL7 server", 

553 ) 

554 failure_reason: Mapped[Optional[str]] = mapped_column( 

555 Text, comment="Reason for failure" 

556 ) 

557 message: Mapped[Optional[str]] = mapped_column( 

558 LongText, comment="Message body, if kept" 

559 ) 

560 reply: Mapped[Optional[str]] = mapped_column( 

561 Text, comment="Server's reply, if kept" 

562 ) 

563 

564 exported_task = relationship(ExportedTask) 

565 

566 def __init__( 

567 self, exported_task: ExportedTask = None, **kwargs: Any 

568 ) -> None: 

569 """ 

570 Must support parameter-free construction, not least for 

571 :func:`merge_db`. 

572 """ 

573 super().__init__(**kwargs) 

574 self.exported_task = exported_task 

575 

576 self._hl7_msg = None # type: Optional[hl7.Message] 

577 

578 @reconstructor 

579 def init_on_load(self) -> None: 

580 """ 

581 Called when SQLAlchemy recreates an object; see 

582 https://docs.sqlalchemy.org/en/latest/orm/constructors.html. 

583 """ 

584 self._hl7_msg = None 

585 

586 @staticmethod 

587 def task_acceptable_for_hl7( 

588 recipient: ExportRecipient, task: "Task" 

589 ) -> bool: 

590 """ 

591 Is the task valid for HL7 export. (For example, anonymous tasks and 

592 tasks missing key ID information may not be.) 

593 

594 Args: 

595 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

596 task: a :class:`camcops_server.cc_modules.cc_task.Task` object 

597 

598 Returns: 

599 bool: valid? 

600 

601 """ # noqa 

602 if not task: 

603 return False 

604 if task.is_anonymous: 

605 return False # Cannot send anonymous tasks via HL7 

606 patient = task.patient 

607 if not patient: 

608 return False 

609 if not recipient.primary_idnum: 

610 return False # required for HL7 

611 if not patient.has_idnum_type(recipient.primary_idnum): 

612 return False 

613 return True 

614 

615 def valid(self) -> bool: 

616 """ 

617 Checks for internal validity; returns a bool. 

618 """ 

619 exported_task = self.exported_task 

620 task = exported_task.task 

621 recipient = exported_task.recipient 

622 return self.task_acceptable_for_hl7(recipient, task) 

623 

624 def succeed(self, now: Pendulum = None) -> None: 

625 """ 

626 Record that we succeeded, and so did our associated task export. 

627 """ 

628 now = now or get_now_utc_datetime() 

629 self.success = True 

630 self.sent_at_utc = now 

631 self.exported_task.succeed() 

632 

633 def abort(self, msg: str, diverted_not_sent: bool = False) -> None: 

634 """ 

635 Record that we failed, and so did our associated task export. 

636 

637 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to 

638 functions named ``fail``: 

639 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) 

640 

641 Args: 

642 msg: reason for failure 

643 diverted_not_sent: deliberately diverted (and not counted as sent) 

644 rather than a sending failure? 

645 """ 

646 self.success = False 

647 self.failure_reason = msg 

648 self.exported_task.abort( 

649 "HL7 message deliberately not sent; diverted to file" 

650 if diverted_not_sent 

651 else "HL7 sending failed" 

652 ) 

653 

654 def export_task(self, req: "CamcopsRequest") -> None: 

655 """ 

656 Exports the task itself to an HL7 message. 

657 

658 Args: 

659 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

660 """ 

661 if not self.valid(): 

662 self.abort( 

663 "Unsuitable for HL7; should have been filtered out earlier" 

664 ) 

665 return 

666 self.make_hl7_message(req) 

667 recipient = self.exported_task.recipient 

668 if recipient.hl7_debug_divert_to_file: 

669 self.divert_to_file(req) 

670 else: 

671 # Proper HL7 message 

672 self.transmit_hl7() 

673 

674 def divert_to_file(self, req: "CamcopsRequest") -> None: 

675 """ 

676 Write an HL7 message to a file. For debugging. 

677 

678 Args: 

679 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

680 """ 

681 exported_task = self.exported_task 

682 recipient = exported_task.recipient 

683 filename = recipient.get_filename( 

684 req, exported_task.task, override_task_format="hl7" 

685 ) 

686 now_utc = get_now_utc_pendulum() 

687 log.info("Diverting HL7 message to file {!r}", filename) 

688 written = exported_task.export_file( 

689 filename=filename, text=str(self._hl7_msg) 

690 ) 

691 if not written: 

692 return 

693 

694 if recipient.hl7_debug_treat_diverted_as_sent: 

695 self.sent_at_utc = now_utc 

696 self.succeed(now_utc) 

697 else: 

698 self.abort( 

699 "Exported to file as requested but not sent via HL7", 

700 diverted_not_sent=True, 

701 ) 

702 

703 def make_hl7_message(self, req: "CamcopsRequest") -> None: 

704 """ 

705 Makes an HL7 message and stores it in ``self._hl7_msg``. 

706 

707 May also store it in ``self.message`` (which is saved to the database), 

708 if we're saving HL7 messages. 

709 

710 See 

711 

712 - https://python-hl7.readthedocs.org/en/latest/index.html 

713 """ 

714 task = self.exported_task.task 

715 recipient = self.exported_task.recipient 

716 

717 # --------------------------------------------------------------------- 

718 # Parts 

719 # --------------------------------------------------------------------- 

720 msh_segment = make_msh_segment( 

721 message_datetime=req.now, message_control_id=str(self.id) 

722 ) 

723 pid_segment = task.get_patient_hl7_pid_segment(req, recipient) 

724 other_segments = task.get_hl7_data_segments(req, recipient) 

725 

726 # --------------------------------------------------------------------- 

727 # Whole message 

728 # --------------------------------------------------------------------- 

729 segments = [msh_segment, pid_segment] + other_segments 

730 self._hl7_msg = hl7.Message(SEGMENT_SEPARATOR, segments) 

731 if recipient.hl7_keep_message: 

732 self.message = str(self._hl7_msg) 

733 

734 def transmit_hl7(self) -> None: 

735 """ 

736 Sends the HL7 message over TCP/IP. 

737 

738 - Default MLLP/HL7 port is 2575 

739 - MLLP = minimum lower layer protocol 

740 

741 - https://www.cleo.com/support/byproduct/lexicom/usersguide/mllp_configuration.htm 

742 - https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml?search=hl7 

743 - Essentially just a TCP socket with a minimal wrapper: 

744 https://stackoverflow.com/questions/11126918 

745 

746 - https://python-hl7.readthedocs.org/en/latest/api.html; however, 

747 we've modified that 

748 """ # noqa 

749 recipient = self.exported_task.recipient 

750 

751 if recipient.hl7_ping_first: 

752 pinged = self.ping_hl7_server(recipient) 

753 if not pinged: 

754 self.abort("Could not ping HL7 host") 

755 return 

756 

757 try: 

758 log.info( 

759 "Sending HL7 message to {}:{}", 

760 recipient.hl7_host, 

761 recipient.hl7_port, 

762 ) 

763 with MLLPTimeoutClient( 

764 recipient.hl7_host, 

765 recipient.hl7_port, 

766 recipient.hl7_network_timeout_ms, 

767 ) as client: 

768 server_replied, reply = client.send_message(self._hl7_msg) 

769 except socket.timeout: 

770 self.abort("Failed to send message via MLLP: timeout") 

771 return 

772 except Exception as e: 

773 self.abort(f"Failed to send message via MLLP: {e}") 

774 return 

775 

776 if not server_replied: 

777 self.abort("No response from server") 

778 return 

779 

780 self.reply_at_utc = get_now_utc_datetime() 

781 if recipient.hl7_keep_reply: 

782 self.reply = reply 

783 

784 try: 

785 replymsg = hl7.parse(reply) 

786 except Exception as e: 

787 self.abort(f"Malformed reply: {e}") 

788 return 

789 

790 success, failure_reason = msg_is_successful_ack(replymsg) 

791 if success: 

792 self.succeed() 

793 else: 

794 self.abort(failure_reason) 

795 

796 @staticmethod 

797 def ping_hl7_server(recipient: ExportRecipient) -> bool: 

798 # noinspection HttpUrlsUsage 

799 """ 

800 Performs a TCP/IP ping on our HL7 server; returns success. If we've 

801 already pinged successfully during this run, don't bother doing it 

802 again. 

803 

804 (No HL7 PING method yet. Proposal is 

805 http://hl7tsc.org/wiki/index.php?title=FTSD-ConCalls-20081028 

806 So use TCP/IP ping.) 

807 

808 Args: 

809 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

810 

811 Returns: 

812 bool: success 

813 

814 """ # noqa 

815 timeout_s = min(recipient.hl7_network_timeout_ms // 1000, 1) 

816 if ping(hostname=recipient.hl7_host, timeout_s=timeout_s): 

817 return True 

818 else: 

819 log.error("Failed to ping {!r}", recipient.hl7_host) 

820 return False 

821 

822 

823# ============================================================================= 

824# File export 

825# ============================================================================= 

826 

827 

828class ExportedTaskFileGroup(Base): 

829 """ 

830 Represents a small set of files exported in relation to a single task. 

831 """ 

832 

833 __tablename__ = "_exported_task_filegroup" 

834 

835 id: Mapped[int] = mapped_column( 

836 BigInteger, 

837 primary_key=True, 

838 autoincrement=True, 

839 comment="Arbitrary primary key", 

840 ) 

841 exported_task_id: Mapped[int] = mapped_column( 

842 BigInteger, 

843 ForeignKey(ExportedTask.id), 

844 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", 

845 ) 

846 filenames: Mapped[Optional[List[str]]] = mapped_column( 

847 StringListType, comment="List of filenames exported" 

848 ) 

849 script_called: Mapped[bool] = mapped_column( 

850 default=False, 

851 comment=( 

852 f"Was the {ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} " 

853 f"script called?" 

854 ), 

855 ) 

856 script_retcode: Mapped[Optional[int]] = mapped_column( 

857 comment=( 

858 f"Return code from the " 

859 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" 

860 ), 

861 ) 

862 script_stdout: Mapped[Optional[str]] = mapped_column( 

863 UnicodeText, 

864 comment=( 

865 f"stdout from the " 

866 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" 

867 ), 

868 ) 

869 script_stderr: Mapped[Optional[str]] = mapped_column( 

870 UnicodeText, 

871 comment=( 

872 f"stderr from the " 

873 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" 

874 ), 

875 ) 

876 

877 exported_task = relationship(ExportedTask) 

878 

879 def __init__( 

880 self, exported_task: ExportedTask = None, **kwargs: Any 

881 ) -> None: 

882 """ 

883 Args: 

884 exported_task: :class:`ExportedTask` object 

885 """ 

886 super().__init__(**kwargs) 

887 self.exported_task = exported_task 

888 

889 def export_file( 

890 self, 

891 filename: str, 

892 text: str = None, 

893 binary: bytes = None, 

894 text_encoding: str = UTF8, 

895 ) -> bool: 

896 """ 

897 Exports the file. 

898 

899 Args: 

900 filename: 

901 text: text contents (specify this XOR ``binary``) 

902 binary: binary contents (specify this XOR ``text``) 

903 text_encoding: encoding to use when writing text 

904 

905 Returns: 

906 bool: was it exported? 

907 """ 

908 assert bool(text) != bool(binary), "Specify text XOR binary" 

909 exported_task = self.exported_task 

910 filename = os.path.abspath(filename) 

911 directory = os.path.dirname(filename) 

912 recipient = exported_task.recipient 

913 

914 if not recipient.file_overwrite_files and os.path.isfile(filename): 

915 self.abort(f"File already exists: {filename!r}") 

916 return False 

917 

918 if recipient.file_make_directory: 

919 try: 

920 mkdir_p(directory) 

921 except Exception as e: 

922 self.abort(f"Couldn't make directory {directory!r}: {e}") 

923 return False 

924 

925 try: 

926 log.debug("Writing to {!r}", filename) 

927 if text: 

928 with open(filename, mode="w", encoding=text_encoding) as f: 

929 f.write(text) 

930 else: 

931 with open(filename, mode="wb") as f: 

932 f.write(binary) 

933 except Exception as e: 

934 self.abort(f"Failed to open or write file {filename!r}: {e}") 

935 return False 

936 

937 self.note_exported_file(filename) 

938 return True 

939 

940 def note_exported_file(self, *filenames: str) -> None: 

941 """ 

942 Records a filename that has been exported, or several. 

943 

944 Args: 

945 *filenames: filenames 

946 """ 

947 if self.filenames is None: 

948 self.filenames = list(filenames) 

949 else: 

950 # See ExportedTask._add_failure_reason() above: 

951 # noinspection PyAugmentAssignment,PyTypeChecker 

952 self.filenames = self.filenames + list(filenames) 

953 

954 def export_task(self, req: "CamcopsRequest") -> None: 

955 """ 

956 Exports the task itself to a file. 

957 

958 Args: 

959 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

960 """ 

961 exported_task = self.exported_task 

962 task = exported_task.task 

963 recipient = exported_task.recipient 

964 task_format = recipient.task_format 

965 task_filename = recipient.get_filename(req, task) 

966 rio_metadata_filename = change_filename_ext( 

967 task_filename, ".metadata" 

968 ).replace(" ", "") 

969 # ... in case we use it. No spaces in its filename. 

970 

971 # Before we calculate the PDF, etc., we can pre-check for existing 

972 # files. 

973 if not recipient.file_overwrite_files: 

974 target_filenames = [task_filename] 

975 if recipient.file_export_rio_metadata: 

976 target_filenames.append(rio_metadata_filename) 

977 for fname in target_filenames: 

978 if os.path.isfile(os.path.abspath(fname)): 

979 self.abort(f"File already exists: {fname!r}") 

980 return 

981 

982 # Export task 

983 if task_format == FileType.PDF: 

984 binary = task.get_pdf(req) 

985 text = None 

986 elif task_format == FileType.HTML: 

987 binary = None 

988 text = task.get_html(req) 

989 elif task_format == FileType.XML: 

990 binary = None 

991 text = task.get_xml(req) 

992 else: 

993 raise AssertionError("Unknown task_format") 

994 written = self.export_file( 

995 task_filename, text=text, binary=binary, text_encoding=UTF8 

996 ) 

997 if not written: 

998 return 

999 

1000 # RiO metadata too? 

1001 if recipient.file_export_rio_metadata: 

1002 

1003 metadata = task.get_rio_metadata( 

1004 req, 

1005 recipient.rio_idnum, 

1006 recipient.rio_uploading_user, 

1007 recipient.rio_document_type, 

1008 ) 

1009 # We're going to write in binary mode, to get the newlines right. 

1010 # One way is: 

1011 # with codecs.open(filename, mode="w", encoding="ascii") as f: 

1012 # f.write(metadata.replace("\n", DOS_NEWLINE)) 

1013 # Here's another. 

1014 metadata = metadata.replace("\n", DOS_NEWLINE) 

1015 # ... Servelec say CR = "\r", but DOS is \r\n. 

1016 metadata_binary = metadata.encode("ascii") 

1017 # UTF-8 is NOT supported by RiO for metadata. 

1018 written_metadata = self.export_file( 

1019 rio_metadata_filename, binary=metadata_binary 

1020 ) 

1021 if not written_metadata: 

1022 return 

1023 

1024 self.finish_run_script_if_necessary() 

1025 

1026 def succeed(self) -> None: 

1027 """ 

1028 Register success. 

1029 """ 

1030 self.exported_task.succeed() 

1031 

1032 def abort(self, msg: str) -> None: 

1033 """ 

1034 Record failure, and why. 

1035 

1036 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to 

1037 functions named ``fail``: 

1038 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) 

1039 

1040 Args: 

1041 msg: why 

1042 """ 

1043 self.exported_task.abort(msg) 

1044 

1045 def finish_run_script_if_necessary(self) -> None: 

1046 """ 

1047 Completes the file export by running the external script, if required. 

1048 """ 

1049 recipient = self.exported_task.recipient 

1050 if self.filenames and recipient.file_script_after_export: 

1051 args = [recipient.file_script_after_export] + self.filenames 

1052 try: 

1053 encoding = sys.getdefaultencoding() 

1054 p = subprocess.Popen( 

1055 args, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

1056 ) 

1057 out, err = p.communicate() 

1058 self.script_called = True 

1059 self.script_stdout = out.decode(encoding) 

1060 self.script_stderr = err.decode(encoding) 

1061 self.script_retcode = p.returncode 

1062 except Exception as e: 

1063 self.script_called = False 

1064 self.script_stdout = "" 

1065 self.script_stderr = str(e) 

1066 self.abort("Failed to run script") 

1067 return 

1068 self.succeed() 

1069 

1070 

1071# ============================================================================= 

1072# E-mail export 

1073# ============================================================================= 

1074 

1075 

1076class ExportedTaskEmail(Base): 

1077 """ 

1078 Represents an individual email export. 

1079 """ 

1080 

1081 __tablename__ = "_exported_task_email" 

1082 

1083 id: Mapped[int] = mapped_column( 

1084 BigInteger, 

1085 primary_key=True, 

1086 autoincrement=True, 

1087 comment="Arbitrary primary key", 

1088 ) 

1089 exported_task_id: Mapped[int] = mapped_column( 

1090 BigInteger, 

1091 ForeignKey(ExportedTask.id), 

1092 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", 

1093 ) 

1094 email_id: Mapped[Optional[int]] = mapped_column( 

1095 BigInteger, 

1096 ForeignKey(Email.id), 

1097 comment=f"FK to {Email.__tablename__}.{Email.id.name}", 

1098 ) 

1099 

1100 exported_task = relationship(ExportedTask) 

1101 email = relationship(Email) 

1102 

1103 def __init__( 

1104 self, exported_task: ExportedTask = None, **kwargs: Any 

1105 ) -> None: 

1106 """ 

1107 Args: 

1108 exported_task: :class:`ExportedTask` object 

1109 """ 

1110 super().__init__(**kwargs) 

1111 self.exported_task = exported_task 

1112 

1113 def export_task(self, req: "CamcopsRequest") -> None: 

1114 """ 

1115 Exports the task itself to an email. 

1116 

1117 Args: 

1118 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1119 """ 

1120 exported_task = self.exported_task 

1121 task = exported_task.task 

1122 recipient = exported_task.recipient 

1123 task_format = recipient.task_format 

1124 task_filename = os.path.basename(recipient.get_filename(req, task)) 

1125 # ... we don't want a full path for e-mail! 

1126 encoding = "utf8" 

1127 

1128 # Export task 

1129 attachments = [] # type: List[Tuple[str, bytes]] 

1130 if task_format == FileType.PDF: 

1131 binary = task.get_pdf(req) 

1132 elif task_format == FileType.HTML: 

1133 binary = task.get_html(req).encode(encoding) 

1134 elif task_format == FileType.XML: 

1135 binary = task.get_xml(req).encode(encoding) 

1136 else: 

1137 raise AssertionError("Unknown task_format") 

1138 attachments.append((task_filename, binary)) 

1139 

1140 self.email = Email( 

1141 from_addr=recipient.email_from, 

1142 # date: automatic 

1143 sender=recipient.email_sender, 

1144 reply_to=recipient.email_reply_to, 

1145 to=recipient.email_to, 

1146 cc=recipient.email_cc, 

1147 bcc=recipient.email_bcc, 

1148 subject=recipient.get_email_subject(req, task), 

1149 body=recipient.get_email_body(req, task), 

1150 content_type=( 

1151 CONTENT_TYPE_HTML 

1152 if recipient.email_body_as_html 

1153 else CONTENT_TYPE_TEXT 

1154 ), 

1155 charset=encoding, 

1156 attachments_binary=attachments, 

1157 save_msg_string=recipient.email_keep_message, 

1158 ) 

1159 self.email.send( 

1160 host=recipient.email_host, 

1161 username=recipient.email_host_username, 

1162 password=recipient.email_host_password, 

1163 port=recipient.email_port, 

1164 use_tls=recipient.email_use_tls, 

1165 ) 

1166 if self.email.sent: 

1167 exported_task.succeed() 

1168 else: 

1169 exported_task.abort("Failed to send e-mail") 

1170 

1171 

1172# ============================================================================= 

1173# REDCap export 

1174# ============================================================================= 

1175 

1176 

1177class ExportedTaskRedcap(Base): 

1178 """ 

1179 Represents an individual REDCap export. 

1180 """ 

1181 

1182 __tablename__ = "_exported_task_redcap" 

1183 

1184 id: Mapped[int] = mapped_column( 

1185 primary_key=True, 

1186 autoincrement=True, 

1187 comment="Arbitrary primary key", 

1188 ) 

1189 exported_task_id: Mapped[int] = mapped_column( 

1190 BigInteger, 

1191 ForeignKey(ExportedTask.id), 

1192 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", 

1193 ) 

1194 

1195 exported_task = relationship(ExportedTask) 

1196 

1197 # We store these just as an audit trail 

1198 redcap_record_id: Mapped[Optional[int]] = mapped_column( 

1199 UnicodeText, 

1200 comment=( 

1201 "ID of the (patient) record on the REDCap instance where " 

1202 "this task has been exported" 

1203 ), 

1204 ) 

1205 

1206 redcap_instrument_name: Mapped[Optional[str]] = mapped_column( 

1207 UnicodeText, 

1208 comment=( 

1209 "The name of the REDCap instrument name (form) where this " 

1210 "task has been exported" 

1211 ), 

1212 ) 

1213 

1214 redcap_instance_id: Mapped[Optional[int]] = mapped_column( 

1215 comment=( 

1216 "1-based index of this particular task within the patient " 

1217 "record. Increments on every repeat attempt." 

1218 ), 

1219 ) 

1220 

1221 def __init__( 

1222 self, exported_task: ExportedTask = None, **kwargs: Any 

1223 ) -> None: 

1224 """ 

1225 Args: 

1226 exported_task: :class:`ExportedTask` object 

1227 """ 

1228 super().__init__(**kwargs) 

1229 self.exported_task = exported_task 

1230 

1231 def export_task(self, req: "CamcopsRequest") -> None: 

1232 """ 

1233 Exports the task to REDCap. 

1234 

1235 Args: 

1236 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1237 """ 

1238 exported_task = self.exported_task 

1239 exporter = RedcapTaskExporter() 

1240 

1241 try: 

1242 exporter.export_task(req, self) 

1243 exported_task.succeed() 

1244 except RedcapExportException as e: 

1245 exported_task.abort(str(e)) 

1246 

1247 

1248# ============================================================================= 

1249# FHIR export 

1250# ============================================================================= 

1251 

1252 

1253class ExportedTaskFhir(Base): 

1254 """ 

1255 Represents an individual FHIR export. 

1256 """ 

1257 

1258 __tablename__ = "_exported_task_fhir" 

1259 

1260 id: Mapped[int] = mapped_column( 

1261 primary_key=True, 

1262 autoincrement=True, 

1263 comment="Arbitrary primary key", 

1264 ) 

1265 

1266 exported_task_id: Mapped[int] = mapped_column( 

1267 BigInteger, 

1268 ForeignKey(ExportedTask.id), 

1269 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", 

1270 ) 

1271 

1272 exported_task = relationship(ExportedTask) 

1273 

1274 entries = relationship( 

1275 "ExportedTaskFhirEntry", back_populates="exported_task_fhir" 

1276 ) 

1277 

1278 def __init__( 

1279 self, exported_task: ExportedTask = None, **kwargs: Any 

1280 ) -> None: 

1281 """ 

1282 Args: 

1283 exported_task: :class:`ExportedTask` object 

1284 """ 

1285 super().__init__(**kwargs) 

1286 self.exported_task = exported_task 

1287 

1288 def export_task(self, req: "CamcopsRequest") -> None: 

1289 """ 

1290 Exports the task to FHIR. 

1291 

1292 Args: 

1293 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1294 """ 

1295 exported_task = self.exported_task 

1296 

1297 try: 

1298 exporter = FhirTaskExporter(req, self) 

1299 exporter.export_task() 

1300 exported_task.succeed() 

1301 except FhirExportException as e: 

1302 exported_task.abort(str(e)) 

1303 

1304 

1305class ExportedTaskFhirEntry(Base): 

1306 """ 

1307 Details of Patients, Questionnaires, QuestionnaireResponses exported to 

1308 a FHIR server for a single task. 

1309 """ 

1310 

1311 __tablename__ = "_exported_task_fhir_entry" 

1312 

1313 id: Mapped[int] = mapped_column( 

1314 primary_key=True, 

1315 autoincrement=True, 

1316 comment="Arbitrary primary key", 

1317 ) 

1318 

1319 exported_task_fhir_id: Mapped[int] = mapped_column( 

1320 ForeignKey(ExportedTaskFhir.id), 

1321 comment="FK to {}.{}".format( 

1322 ExportedTaskFhir.__tablename__, ExportedTaskFhir.id.name 

1323 ), 

1324 ) 

1325 

1326 etag: Mapped[Optional[str]] = mapped_column( 

1327 UnicodeText, comment="The ETag for the resource (if relevant)" 

1328 ) 

1329 

1330 last_modified: Mapped[Optional[datetime.datetime]] = mapped_column( 

1331 comment="Server's date/time modified." 

1332 ) 

1333 

1334 location: Mapped[Optional[str]] = mapped_column( 

1335 UnicodeText, 

1336 comment="The location (if the operation returns a location).", 

1337 ) 

1338 

1339 status: Mapped[Optional[str]] = mapped_column( 

1340 UnicodeText, comment="Status response code (text optional)." 

1341 ) 

1342 

1343 # TODO: outcome? 

1344 

1345 exported_task_fhir = relationship(ExportedTaskFhir) 

1346 

1347 @property 

1348 def location_url(self) -> str: 

1349 """ 

1350 Puts the FHIR server API URL together with the returned location, so 

1351 we can hyperlink to the resource. 

1352 """ 

1353 if not self.location: 

1354 return "" 

1355 try: 

1356 api_url = ( 

1357 self.exported_task_fhir.exported_task.recipient.fhir_api_url 

1358 ) 

1359 except AttributeError: 

1360 return "" 

1361 # Avoid urllib.parse.urljoin; it does complex (and for our purposes 

1362 # wrong) things. See 

1363 # https://stackoverflow.com/questions/10893374/python-confusions-with-urljoin 

1364 return posixpath.join(api_url, self.location)