Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/cc_modules/cc_exportmodels.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27**Define models for export functions (e.g. HL7, file-based export).** 

28 

29""" 

30 

31import logging 

32import os 

33import socket 

34import subprocess 

35import sys 

36from typing import Generator, List, Optional, Tuple, TYPE_CHECKING 

37 

38from cardinal_pythonlib.datetimefunc import ( 

39 get_now_utc_datetime, 

40 get_now_utc_pendulum, 

41) 

42from cardinal_pythonlib.email.sendmail import ( 

43 CONTENT_TYPE_HTML, 

44 CONTENT_TYPE_TEXT, 

45) 

46from cardinal_pythonlib.fileops import mkdir_p 

47from cardinal_pythonlib.logs import BraceStyleAdapter 

48from cardinal_pythonlib.network import ping 

49from cardinal_pythonlib.sqlalchemy.list_types import StringListType 

50from cardinal_pythonlib.sqlalchemy.orm_query import bool_from_exists_clause 

51import hl7 

52from pendulum import DateTime as Pendulum 

53from sqlalchemy.orm import ( 

54 reconstructor, 

55 relationship, 

56 Session as SqlASession, 

57) 

58from sqlalchemy.sql.schema import Column, ForeignKey 

59from sqlalchemy.sql.sqltypes import ( 

60 BigInteger, 

61 Boolean, 

62 DateTime, 

63 Integer, 

64 Text, 

65 UnicodeText, 

66) 

67 

68from camcops_server.cc_modules.cc_constants import ( 

69 ConfigParamExportRecipient, 

70 FileType, 

71) 

72from camcops_server.cc_modules.cc_email import Email 

73from camcops_server.cc_modules.cc_exportrecipient import ( 

74 ExportRecipient, 

75) 

76from camcops_server.cc_modules.cc_exportrecipientinfo import ( 

77 ExportTransmissionMethod, 

78) 

79from camcops_server.cc_modules.cc_fhir import ( 

80 FhirExportException, 

81 FhirTaskExporter, 

82) 

83from camcops_server.cc_modules.cc_filename import ( 

84 change_filename_ext, 

85) 

86from camcops_server.cc_modules.cc_hl7 import ( 

87 make_msh_segment, 

88 MLLPTimeoutClient, 

89 msg_is_successful_ack, 

90 SEGMENT_SEPARATOR, 

91) 

92from camcops_server.cc_modules.cc_redcap import ( 

93 RedcapExportException, 

94 RedcapTaskExporter, 

95) 

96from camcops_server.cc_modules.cc_sqla_coltypes import ( 

97 LongText, 

98 TableNameColType, 

99) 

100from camcops_server.cc_modules.cc_sqlalchemy import Base 

101from camcops_server.cc_modules.cc_taskcollection import ( 

102 TaskCollection, 

103 TaskSortMethod, 

104) 

105from camcops_server.cc_modules.cc_taskfactory import task_factory_no_security_checks # noqa 

106 

107if TYPE_CHECKING: 

108 from camcops_server.cc_modules.cc_request import CamcopsRequest 

109 from camcops_server.cc_modules.cc_task import Task 

110 

111log = BraceStyleAdapter(logging.getLogger(__name__)) 

112 

113 

114# ============================================================================= 

115# Constants 

116# ============================================================================= 

117 

118DOS_NEWLINE = "\r\n" 

119UTF8 = "utf8" 

120 

121 

122# ============================================================================= 

123# Create task collections for export 

124# ============================================================================= 

125 

126def get_collection_for_export(req: "CamcopsRequest", 

127 recipient: ExportRecipient, 

128 via_index: bool = True, 

129 debug: bool = False) -> TaskCollection: 

130 """ 

131 Returns an appropriate task collection for this export recipient, namely 

132 those tasks that are desired and (in the case of incremental exports) 

133 haven't already been sent. 

134 

135 "Not already sent" means "not already sent to an export recipient with 

136 the same name (even if other aspects of the export recipient have 

137 changed)". 

138 

139 Args: 

140 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

141 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

142 via_index: use the task index (faster)? 

143 debug: report details? 

144 

145 Returns: 

146 a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

147 """ # noqa 

148 if not via_index: 

149 log.debug("Task index disabled for get_collection_for_export()") 

150 collection = TaskCollection( 

151 req=req, 

152 sort_method_by_class=TaskSortMethod.CREATION_DATE_ASC, 

153 current_only=True, 

154 via_index=via_index, 

155 export_recipient=recipient, 

156 ) 

157 if debug: 

158 log.critical( 

159 "get_collection_for_export(): recipient={!r}, collection={!r}", 

160 recipient, collection) 

161 return collection 

162 

163 

164def gen_exportedtasks(collection: TaskCollection) \ 

165 -> Generator["ExportedTask", None, None]: 

166 """ 

167 Generates task export entries from a collection. 

168 

169 Args: 

170 collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

171 

172 Yields: 

173 :class:`ExportedTask` objects 

174 

175 """ # noqa 

176 dbsession = collection.dbsession 

177 recipient = collection.export_recipient 

178 assert recipient is not None, "TaskCollection has no export_recipient" 

179 for task in collection.gen_tasks_by_class(): 

180 et = ExportedTask(recipient, task) 

181 dbsession.add(et) 

182 yield et 

183 

184 

185def gen_tasks_having_exportedtasks(collection: TaskCollection) \ 

186 -> Generator["Task", None, None]: 

187 """ 

188 Generates tasks from a collection, creating export logs as we go. 

189 

190 Used for database exports. 

191 

192 Args: 

193 collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

194 

195 Yields: 

196 :class:`camcops_server.cc_modules.cc_task.Task` objects 

197 

198 """ # noqa 

199 for et in gen_exportedtasks(collection): 

200 yield et.task 

201 et.succeed() 

202 

203 

204# ============================================================================= 

205# ExportedTask class 

206# ============================================================================= 

207 

208class ExportedTask(Base): 

209 """ 

210 Class representing an attempt to exported a task (as part of a 

211 :class:`ExportRun` to a specific 

212 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`. 

213 """ 

214 __tablename__ = "_exported_tasks" 

215 

216 id = Column( 

217 "id", BigInteger, 

218 primary_key=True, autoincrement=True, 

219 comment="Arbitrary primary key" 

220 ) 

221 recipient_id = Column( 

222 "recipient_id", BigInteger, ForeignKey(ExportRecipient.id), 

223 nullable=False, 

224 comment=f"FK to {ExportRecipient.__tablename__}.{ExportRecipient.id.name}" # noqa 

225 ) 

226 basetable = Column( 

227 "basetable", TableNameColType, nullable=False, index=True, 

228 comment="Base table of task concerned" 

229 ) 

230 task_server_pk = Column( 

231 "task_server_pk", Integer, nullable=False, index=True, 

232 comment="Server PK of task in basetable (_pk field)" 

233 ) 

234 start_at_utc = Column( 

235 "start_at_utc", DateTime, 

236 nullable=False, index=True, 

237 comment="Time export was started (UTC)" 

238 ) 

239 finish_at_utc = Column( 

240 "finish_at_utc", DateTime, 

241 comment="Time export was finished (UTC)" 

242 ) 

243 success = Column( 

244 "success", Boolean, default=False, nullable=False, 

245 comment="Task exported successfully?" 

246 ) 

247 failure_reasons = Column( 

248 "failure_reasons", StringListType, 

249 comment="Reasons for failure" 

250 ) 

251 cancelled = Column( 

252 "cancelled", Boolean, default=False, nullable=False, 

253 comment="Export subsequently cancelled/invalidated (may trigger resend)" # noqa 

254 ) 

255 cancelled_at_utc = Column( 

256 "cancelled_at_utc", DateTime, 

257 comment="Time export was cancelled at (UTC)" 

258 ) 

259 

260 recipient = relationship(ExportRecipient) 

261 

262 hl7_messages = relationship("ExportedTaskHL7Message") 

263 filegroups = relationship("ExportedTaskFileGroup") 

264 emails = relationship("ExportedTaskEmail") 

265 

266 def __init__(self, 

267 recipient: ExportRecipient = None, 

268 task: "Task" = None, 

269 basetable: str = None, 

270 task_server_pk: int = None, 

271 *args, **kwargs) -> None: 

272 """ 

273 Can initialize with a task, or a basetable/task_server_pk combination. 

274 

275 Args: 

276 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

277 task: a :class:`camcops_server.cc_modules.cc_task.Task` object 

278 basetable: base table name of the task 

279 task_server_pk: server PK of the task 

280 

281 (However, we must also support a no-parameter constructor, not least 

282 for our :func:`merge_db` function.) 

283 """ # noqa 

284 super().__init__(*args, **kwargs) 

285 self.recipient = recipient 

286 self.start_at_utc = get_now_utc_datetime() 

287 if task: 

288 assert (not basetable) and task_server_pk is None, ( 

289 "Task specified; mustn't specify basetable/task_server_pk" 

290 ) 

291 self.basetable = task.tablename 

292 self.task_server_pk = task.pk 

293 self._task = task 

294 else: 

295 self.basetable = basetable 

296 self.task_server_pk = task_server_pk 

297 self._task = None # type: Optional[Task] 

298 

299 @reconstructor 

300 def init_on_load(self) -> None: 

301 """ 

302 Called when SQLAlchemy recreates an object; see 

303 https://docs.sqlalchemy.org/en/latest/orm/constructors.html. 

304 """ 

305 self._task = None # type: Optional[Task] 

306 

307 @property 

308 def task(self) -> "Task": 

309 """ 

310 Returns the associated task. 

311 """ 

312 if self._task is None: 

313 dbsession = SqlASession.object_session(self) 

314 try: 

315 self._task = task_factory_no_security_checks( 

316 dbsession, self.basetable, self.task_server_pk) 

317 except KeyError: 

318 log.warning( 

319 "Failed to retrieve task for basetable={!r}, PK={!r}", 

320 self.basetable, 

321 self.task_server_pk 

322 ) 

323 self._task = None 

324 return self._task 

325 

326 def succeed(self) -> None: 

327 """ 

328 Register success. 

329 """ 

330 self.success = True 

331 self.finish() 

332 

333 def abort(self, msg: str) -> None: 

334 """ 

335 Record failure, and why. 

336 

337 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to 

338 functions named ``fail``: 

339 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) 

340 

341 Args: 

342 msg: why 

343 """ 

344 self.success = False 

345 log.error("Task export failed: {}", msg) 

346 self._add_failure_reason(msg) 

347 self.finish() 

348 

349 def _add_failure_reason(self, msg: str) -> None: 

350 """ 

351 Writes to our ``failure_reasons`` list in a way that (a) obviates the 

352 need to create an empty list via ``__init__()``, and (b) will 

353 definitely mark it as dirty, so it gets saved to the database. 

354 

355 See :class:`cardinal_pythonlib.sqlalchemy.list_types.StringListType`. 

356 

357 Args: 

358 msg: the message 

359 """ 

360 if self.failure_reasons is None: 

361 self.failure_reasons = [msg] 

362 else: 

363 # Do not use .append(); that won't mark the record as dirty. 

364 # Don't use "+="; similarly, that calls list.__iadd__(), not 

365 # InstrumentedAttribute.__set__(). 

366 # noinspection PyAugmentAssignment 

367 self.failure_reasons = self.failure_reasons + [msg] 

368 

369 def finish(self) -> None: 

370 """ 

371 Records the finish time. 

372 """ 

373 self.finish_at_utc = get_now_utc_datetime() 

374 

375 def export(self, req: "CamcopsRequest") -> None: 

376 """ 

377 Performs an export of the specific task. 

378 

379 Args: 

380 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

381 """ 

382 dbsession = req.dbsession 

383 recipient = self.recipient 

384 transmission_method = recipient.transmission_method 

385 log.info("Exporting task {!r} to recipient {}", self.task, recipient) 

386 

387 if transmission_method == ExportTransmissionMethod.EMAIL: 

388 email = ExportedTaskEmail(self) 

389 dbsession.add(email) 

390 email.export_task(req) 

391 

392 elif transmission_method == ExportTransmissionMethod.FHIR: 

393 efhir = ExportedTaskFhir(self) 

394 dbsession.add(efhir) 

395 dbsession.flush() 

396 efhir.export_task(req) 

397 

398 elif transmission_method == ExportTransmissionMethod.FILE: 

399 efg = ExportedTaskFileGroup(self) 

400 dbsession.add(efg) 

401 efg.export_task(req) 

402 

403 elif transmission_method == ExportTransmissionMethod.HL7: 

404 ehl7 = ExportedTaskHL7Message(self) 

405 if ehl7.valid(): 

406 dbsession.add(ehl7) 

407 ehl7.export_task(req) 

408 else: 

409 self.abort("Task not valid for HL7 export") 

410 

411 elif transmission_method == ExportTransmissionMethod.REDCAP: 

412 eredcap = ExportedTaskRedcap(self) 

413 dbsession.add(eredcap) 

414 eredcap.export_task(req) 

415 else: 

416 raise AssertionError("Bug: bad transmission_method") 

417 

418 @property 

419 def filegroup(self) -> "ExportedTaskFileGroup": 

420 """ 

421 Returns a :class:`ExportedTaskFileGroup`, creating it if necessary. 

422 """ 

423 if self.filegroups: 

424 # noinspection PyUnresolvedReferences 

425 filegroup = self.filegroups[0] # type: ExportedTaskFileGroup 

426 else: 

427 filegroup = ExportedTaskFileGroup(self) 

428 # noinspection PyUnresolvedReferences 

429 self.filegroups.append(filegroup) 

430 return filegroup 

431 

432 def export_file(self, 

433 filename: str, 

434 text: str = None, 

435 binary: bytes = None, 

436 text_encoding: str = UTF8) -> bool: 

437 """ 

438 Exports a file. 

439 

440 Args: 

441 filename: 

442 text: text contents (specify this XOR ``binary``) 

443 binary: binary contents (specify this XOR ``text``) 

444 text_encoding: encoding to use when writing text 

445 

446 Returns: was it exported? 

447 """ 

448 filegroup = self.filegroup 

449 return filegroup.export_file(filename=filename, 

450 text=text, 

451 binary=binary, 

452 text_encoding=text_encoding) 

453 

454 def cancel(self) -> None: 

455 """ 

456 Marks the task export as cancelled/invalidated. 

457 

458 May trigger a resend (which is the point). 

459 """ 

460 self.cancelled = True 

461 self.cancelled_at_utc = get_now_utc_datetime() 

462 

463 @classmethod 

464 def task_already_exported(cls, 

465 dbsession: SqlASession, 

466 recipient_name: str, 

467 basetable: str, 

468 task_pk: int) -> bool: 

469 """ 

470 Has the specified task already been successfully exported? 

471 

472 Args: 

473 dbsession: a :class:`sqlalchemy.orm.session.Session` 

474 recipient_name: 

475 basetable: name of the task's base table 

476 task_pk: server PK of the task 

477 

478 Returns: 

479 does a successful export record exist for this task? 

480 

481 """ 

482 exists_q = ( 

483 dbsession.query(cls).join(cls.recipient) 

484 .filter(ExportRecipient.recipient_name == recipient_name) 

485 .filter(cls.basetable == basetable) 

486 .filter(cls.task_server_pk == task_pk) 

487 .filter(cls.success == True) # noqa: E712 

488 .filter(cls.cancelled == False) # noqa: E712 

489 .exists() 

490 ) 

491 return bool_from_exists_clause(dbsession, exists_q) 

492 

493 

494# ============================================================================= 

495# HL7 export 

496# ============================================================================= 

497 

498class ExportedTaskHL7Message(Base): 

499 """ 

500 Represents an individual HL7 message. 

501 """ 

502 __tablename__ = "_exported_task_hl7msg" 

503 

504 id = Column( 

505 "id", BigInteger, primary_key=True, autoincrement=True, 

506 comment="Arbitrary primary key" 

507 ) 

508 exported_task_id = Column( 

509 "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), 

510 nullable=False, 

511 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}" 

512 ) 

513 sent_at_utc = Column( 

514 "sent_at_utc", DateTime, 

515 comment="Time message was sent at (UTC)" 

516 ) 

517 reply_at_utc = Column( 

518 "reply_at_utc", DateTime, 

519 comment="Time message was replied to (UTC)" 

520 ) 

521 success = Column( 

522 "success", Boolean, 

523 comment="Message sent successfully and acknowledged by HL7 server" 

524 ) 

525 failure_reason = Column( 

526 "failure_reason", Text, 

527 comment="Reason for failure" 

528 ) 

529 message = Column( 

530 "message", LongText, 

531 comment="Message body, if kept" 

532 ) 

533 reply = Column( 

534 "reply", Text, 

535 comment="Server's reply, if kept" 

536 ) 

537 

538 exported_task = relationship(ExportedTask) 

539 

540 def __init__(self, 

541 exported_task: ExportedTask = None, 

542 *args, **kwargs) -> None: 

543 """ 

544 Must support parameter-free construction, not least for 

545 :func:`merge_db`. 

546 """ 

547 super().__init__(*args, **kwargs) 

548 self.exported_task = exported_task 

549 

550 self._hl7_msg = None # type: Optional[hl7.Message] 

551 

552 @reconstructor 

553 def init_on_load(self) -> None: 

554 """ 

555 Called when SQLAlchemy recreates an object; see 

556 https://docs.sqlalchemy.org/en/latest/orm/constructors.html. 

557 """ 

558 self._hl7_msg = None 

559 

560 @staticmethod 

561 def task_acceptable_for_hl7(recipient: ExportRecipient, 

562 task: "Task") -> bool: 

563 """ 

564 Is the task valid for HL7 export. (For example, anonymous tasks and 

565 tasks missing key ID information may not be.) 

566 

567 Args: 

568 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

569 task: a :class:`camcops_server.cc_modules.cc_task.Task` object 

570 

571 Returns: 

572 bool: valid? 

573 

574 """ # noqa 

575 if not task: 

576 return False 

577 if task.is_anonymous: 

578 return False # Cannot send anonymous tasks via HL7 

579 patient = task.patient 

580 if not patient: 

581 return False 

582 if not recipient.primary_idnum: 

583 return False # required for HL7 

584 if not patient.has_idnum_type(recipient.primary_idnum): 

585 return False 

586 return True 

587 

588 def valid(self) -> bool: 

589 """ 

590 Checks for internal validity; returns a bool. 

591 """ 

592 exported_task = self.exported_task 

593 task = exported_task.task 

594 recipient = exported_task.recipient 

595 return self.task_acceptable_for_hl7(recipient, task) 

596 

597 def succeed(self, now: Pendulum = None) -> None: 

598 """ 

599 Record that we succeeded, and so did our associated task export. 

600 """ 

601 now = now or get_now_utc_datetime() 

602 self.success = True 

603 self.sent_at_utc = now 

604 self.exported_task.succeed() 

605 

606 def abort(self, msg: str, diverted_not_sent: bool = False) -> None: 

607 """ 

608 Record that we failed, and so did our associated task export. 

609 

610 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to 

611 functions named ``fail``: 

612 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) 

613 

614 Args: 

615 msg: reason for failure 

616 diverted_not_sent: deliberately diverted (and not counted as sent) 

617 rather than a sending failure? 

618 """ 

619 self.success = False 

620 self.failure_reason = msg 

621 self.exported_task.abort( 

622 "HL7 message deliberately not sent; diverted to file" 

623 if diverted_not_sent else "HL7 sending failed" 

624 ) 

625 

626 def export_task(self, req: "CamcopsRequest") -> None: 

627 """ 

628 Exports the task itself to an HL7 message. 

629 

630 Args: 

631 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

632 """ 

633 if not self.valid(): 

634 self.abort( 

635 "Unsuitable for HL7; should have been filtered out earlier") 

636 return 

637 self.make_hl7_message(req) 

638 recipient = self.exported_task.recipient 

639 if recipient.hl7_debug_divert_to_file: 

640 self.divert_to_file(req) 

641 else: 

642 # Proper HL7 message 

643 self.transmit_hl7() 

644 

645 def divert_to_file(self, req: "CamcopsRequest") -> None: 

646 """ 

647 Write an HL7 message to a file. For debugging. 

648 

649 Args: 

650 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

651 """ 

652 exported_task = self.exported_task 

653 recipient = exported_task.recipient 

654 filename = recipient.get_filename(req, exported_task.task, 

655 override_task_format="hl7") 

656 now_utc = get_now_utc_pendulum() 

657 log.info("Diverting HL7 message to file {!r}", filename) 

658 written = exported_task.export_file(filename=filename, 

659 text=str(self._hl7_msg)) 

660 if not written: 

661 return 

662 

663 if recipient.hl7_debug_treat_diverted_as_sent: 

664 self.sent_at_utc = now_utc 

665 self.succeed(now_utc) 

666 else: 

667 self.abort("Exported to file as requested but not sent via HL7", 

668 diverted_not_sent=True) 

669 

670 def make_hl7_message(self, req: "CamcopsRequest") -> None: 

671 """ 

672 Makes an HL7 message and stores it in ``self._hl7_msg``. 

673 

674 May also store it in ``self.message`` (which is saved to the database), 

675 if we're saving HL7 messages. 

676 

677 See 

678 

679 - https://python-hl7.readthedocs.org/en/latest/index.html 

680 """ 

681 task = self.exported_task.task 

682 recipient = self.exported_task.recipient 

683 

684 # --------------------------------------------------------------------- 

685 # Parts 

686 # --------------------------------------------------------------------- 

687 msh_segment = make_msh_segment( 

688 message_datetime=req.now, 

689 message_control_id=str(self.id) 

690 ) 

691 pid_segment = task.get_patient_hl7_pid_segment(req, recipient) 

692 other_segments = task.get_hl7_data_segments(req, recipient) 

693 

694 # --------------------------------------------------------------------- 

695 # Whole message 

696 # --------------------------------------------------------------------- 

697 segments = [msh_segment, pid_segment] + other_segments 

698 self._hl7_msg = hl7.Message(SEGMENT_SEPARATOR, segments) 

699 if recipient.hl7_keep_message: 

700 self.message = str(self._hl7_msg) 

701 

702 def transmit_hl7(self) -> None: 

703 """ 

704 Sends the HL7 message over TCP/IP. 

705 

706 - Default MLLP/HL7 port is 2575 

707 - MLLP = minimum lower layer protocol 

708 

709 - https://www.cleo.com/support/byproduct/lexicom/usersguide/mllp_configuration.htm 

710 - https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml?search=hl7 

711 - Essentially just a TCP socket with a minimal wrapper: 

712 https://stackoverflow.com/questions/11126918 

713 

714 - https://python-hl7.readthedocs.org/en/latest/api.html; however, 

715 we've modified that 

716 """ # noqa 

717 recipient = self.exported_task.recipient 

718 

719 if recipient.hl7_ping_first: 

720 pinged = self.ping_hl7_server(recipient) 

721 if not pinged: 

722 self.abort("Could not ping HL7 host") 

723 return 

724 

725 try: 

726 log.info("Sending HL7 message to {}:{}", 

727 recipient.hl7_host, recipient.hl7_port) 

728 with MLLPTimeoutClient(recipient.hl7_host, 

729 recipient.hl7_port, 

730 recipient.hl7_network_timeout_ms) as client: 

731 server_replied, reply = client.send_message(self._hl7_msg) 

732 except socket.timeout: 

733 self.abort("Failed to send message via MLLP: timeout") 

734 return 

735 except Exception as e: 

736 self.abort(f"Failed to send message via MLLP: {e}") 

737 return 

738 

739 if not server_replied: 

740 self.abort("No response from server") 

741 return 

742 

743 self.reply_at_utc = get_now_utc_datetime() 

744 if recipient.hl7_keep_reply: 

745 self.reply = reply 

746 

747 try: 

748 replymsg = hl7.parse(reply) 

749 except Exception as e: 

750 self.abort(f"Malformed reply: {e}") 

751 return 

752 

753 success, failure_reason = msg_is_successful_ack(replymsg) 

754 if success: 

755 self.succeed() 

756 else: 

757 self.abort(failure_reason) 

758 

759 @staticmethod 

760 def ping_hl7_server(recipient: ExportRecipient) -> bool: 

761 # noinspection HttpUrlsUsage 

762 """ 

763 Performs a TCP/IP ping on our HL7 server; returns success. If we've 

764 already pinged successfully during this run, don't bother doing it 

765 again. 

766 

767 (No HL7 PING method yet. Proposal is 

768 http://hl7tsc.org/wiki/index.php?title=FTSD-ConCalls-20081028 

769 So use TCP/IP ping.) 

770 

771 Args: 

772 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

773 

774 Returns: 

775 bool: success 

776 

777 """ # noqa 

778 timeout_s = min(recipient.hl7_network_timeout_ms // 1000, 1) 

779 if ping(hostname=recipient.hl7_host, timeout_s=timeout_s): 

780 return True 

781 else: 

782 log.error("Failed to ping {!r}", recipient.hl7_host) 

783 return False 

784 

785 

786# ============================================================================= 

787# File export 

788# ============================================================================= 

789 

790class ExportedTaskFileGroup(Base): 

791 """ 

792 Represents a small set of files exported in relation to a single task. 

793 """ 

794 __tablename__ = "_exported_task_filegroup" 

795 

796 id = Column( 

797 "id", BigInteger, primary_key=True, autoincrement=True, 

798 comment="Arbitrary primary key" 

799 ) 

800 exported_task_id = Column( 

801 "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), 

802 nullable=False, 

803 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}" 

804 ) 

805 filenames = Column( 

806 "filenames", StringListType, 

807 comment="List of filenames exported" 

808 ) 

809 script_called = Column( 

810 "script_called", Boolean, default=False, nullable=False, 

811 comment=( 

812 f"Was the {ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} " 

813 f"script called?" 

814 ) 

815 ) 

816 script_retcode = Column( 

817 "script_retcode", Integer, 

818 comment=( 

819 f"Return code from the " 

820 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" 

821 ) 

822 ) 

823 script_stdout = Column( 

824 "script_stdout", UnicodeText, 

825 comment=( 

826 f"stdout from the " 

827 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" 

828 ) 

829 ) 

830 script_stderr = Column( 

831 "script_stderr", UnicodeText, 

832 comment=( 

833 f"stderr from the " 

834 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" 

835 ) 

836 ) 

837 

838 exported_task = relationship(ExportedTask) 

839 

840 def __init__(self, exported_task: ExportedTask = None) -> None: 

841 """ 

842 Args: 

843 exported_task: :class:`ExportedTask` object 

844 """ 

845 self.exported_task = exported_task 

846 

847 def export_file(self, 

848 filename: str, 

849 text: str = None, 

850 binary: bytes = None, 

851 text_encoding: str = UTF8) -> False: 

852 """ 

853 Exports the file. 

854 

855 Args: 

856 filename: 

857 text: text contents (specify this XOR ``binary``) 

858 binary: binary contents (specify this XOR ``text``) 

859 text_encoding: encoding to use when writing text 

860 

861 Returns: 

862 bool: was it exported? 

863 """ 

864 assert bool(text) != bool(binary), "Specify text XOR binary" 

865 exported_task = self.exported_task 

866 filename = os.path.abspath(filename) 

867 directory = os.path.dirname(filename) 

868 recipient = exported_task.recipient 

869 

870 if not recipient.file_overwrite_files and os.path.isfile(filename): 

871 self.abort(f"File already exists: {filename!r}") 

872 return False 

873 

874 if recipient.file_make_directory: 

875 try: 

876 mkdir_p(directory) 

877 except Exception as e: 

878 self.abort(f"Couldn't make directory {directory!r}: {e}") 

879 return False 

880 

881 try: 

882 log.debug("Writing to {!r}", filename) 

883 if text: 

884 with open(filename, mode="w", encoding=text_encoding) as f: 

885 f.write(text) 

886 else: 

887 with open(filename, mode="wb") as f: 

888 f.write(binary) 

889 except Exception as e: 

890 self.abort(f"Failed to open or write file {filename!r}: {e}") 

891 return False 

892 

893 self.note_exported_file(filename) 

894 return True 

895 

896 def note_exported_file(self, *filenames: str) -> None: 

897 """ 

898 Records a filename that has been exported, or several. 

899 

900 Args: 

901 *filenames: filenames 

902 """ 

903 if self.filenames is None: 

904 self.filenames = list(filenames) 

905 else: 

906 # See ExportedTask._add_failure_reason() above: 

907 # noinspection PyAugmentAssignment,PyTypeChecker 

908 self.filenames = self.filenames + list(filenames) 

909 

910 def export_task(self, req: "CamcopsRequest") -> None: 

911 """ 

912 Exports the task itself to a file. 

913 

914 Args: 

915 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

916 """ 

917 exported_task = self.exported_task 

918 task = exported_task.task 

919 recipient = exported_task.recipient 

920 task_format = recipient.task_format 

921 task_filename = recipient.get_filename(req, task) 

922 rio_metadata_filename = change_filename_ext( 

923 task_filename, ".metadata").replace(" ", "") 

924 # ... in case we use it. No spaces in its filename. 

925 

926 # Before we calculate the PDF, etc., we can pre-check for existing 

927 # files. 

928 if not recipient.file_overwrite_files: 

929 target_filenames = [task_filename] 

930 if recipient.file_export_rio_metadata: 

931 target_filenames.append(rio_metadata_filename) 

932 for fname in target_filenames: 

933 if os.path.isfile(os.path.abspath(fname)): 

934 self.abort(f"File already exists: {fname!r}") 

935 return 

936 

937 # Export task 

938 if task_format == FileType.PDF: 

939 binary = task.get_pdf(req) 

940 text = None 

941 elif task_format == FileType.HTML: 

942 binary = None 

943 text = task.get_html(req) 

944 elif task_format == FileType.XML: 

945 binary = None 

946 text = task.get_xml(req) 

947 else: 

948 raise AssertionError("Unknown task_format") 

949 written = self.export_file(task_filename, text=text, binary=binary, 

950 text_encoding=UTF8) 

951 if not written: 

952 return 

953 

954 # RiO metadata too? 

955 if recipient.file_export_rio_metadata: 

956 

957 metadata = task.get_rio_metadata( 

958 req, 

959 recipient.rio_idnum, 

960 recipient.rio_uploading_user, 

961 recipient.rio_document_type 

962 ) 

963 # We're going to write in binary mode, to get the newlines right. 

964 # One way is: 

965 # with codecs.open(filename, mode="w", encoding="ascii") as f: 

966 # f.write(metadata.replace("\n", DOS_NEWLINE)) 

967 # Here's another. 

968 metadata = metadata.replace("\n", DOS_NEWLINE) 

969 # ... Servelec say CR = "\r", but DOS is \r\n. 

970 metadata_binary = metadata.encode("ascii") 

971 # UTF-8 is NOT supported by RiO for metadata. 

972 written_metadata = self.export_file(rio_metadata_filename, 

973 binary=metadata_binary) 

974 if not written_metadata: 

975 return 

976 

977 self.finish_run_script_if_necessary() 

978 

979 def succeed(self) -> None: 

980 """ 

981 Register success. 

982 """ 

983 self.exported_task.succeed() 

984 

985 def abort(self, msg: str) -> None: 

986 """ 

987 Record failure, and why. 

988 

989 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to 

990 functions named ``fail``: 

991 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) 

992 

993 Args: 

994 msg: why 

995 """ 

996 self.exported_task.abort(msg) 

997 

998 def finish_run_script_if_necessary(self) -> None: 

999 """ 

1000 Completes the file export by running the external script, if required. 

1001 """ 

1002 recipient = self.exported_task.recipient 

1003 if self.filenames and recipient.file_script_after_export: 

1004 args = [recipient.file_script_after_export] + self.filenames 

1005 try: 

1006 encoding = sys.getdefaultencoding() 

1007 p = subprocess.Popen(args, stdout=subprocess.PIPE, 

1008 stderr=subprocess.PIPE) 

1009 out, err = p.communicate() 

1010 self.script_called = True 

1011 self.script_stdout = out.decode(encoding) 

1012 self.script_stderr = err.decode(encoding) 

1013 self.script_retcode = p.returncode 

1014 except Exception as e: 

1015 self.script_called = False 

1016 self.script_stdout = "" 

1017 self.script_stderr = str(e) 

1018 self.abort("Failed to run script") 

1019 return 

1020 self.succeed() 

1021 

1022 

1023# ============================================================================= 

1024# E-mail export 

1025# ============================================================================= 

1026 

1027class ExportedTaskEmail(Base): 

1028 """ 

1029 Represents an individual email export. 

1030 """ 

1031 __tablename__ = "_exported_task_email" 

1032 

1033 id = Column( 

1034 "id", BigInteger, primary_key=True, autoincrement=True, 

1035 comment="Arbitrary primary key" 

1036 ) 

1037 exported_task_id = Column( 

1038 "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), 

1039 nullable=False, 

1040 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}" 

1041 ) 

1042 email_id = Column( 

1043 "email_id", BigInteger, ForeignKey(Email.id), 

1044 comment=f"FK to {Email.__tablename__}.{Email.id.name}" 

1045 ) 

1046 

1047 exported_task = relationship(ExportedTask) 

1048 email = relationship(Email) 

1049 

1050 def __init__(self, exported_task: ExportedTask = None) -> None: 

1051 """ 

1052 Args: 

1053 exported_task: :class:`ExportedTask` object 

1054 """ 

1055 self.exported_task = exported_task 

1056 

1057 def export_task(self, req: "CamcopsRequest") -> None: 

1058 """ 

1059 Exports the task itself to an email. 

1060 

1061 Args: 

1062 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1063 """ 

1064 exported_task = self.exported_task 

1065 task = exported_task.task 

1066 recipient = exported_task.recipient 

1067 task_format = recipient.task_format 

1068 task_filename = os.path.basename(recipient.get_filename(req, task)) 

1069 # ... we don't want a full path for e-mail! 

1070 encoding = "utf8" 

1071 

1072 # Export task 

1073 attachments = [] # type: List[Tuple[str, bytes]] 

1074 if task_format == FileType.PDF: 

1075 binary = task.get_pdf(req) 

1076 elif task_format == FileType.HTML: 

1077 binary = task.get_html(req).encode(encoding) 

1078 elif task_format == FileType.XML: 

1079 binary = task.get_xml(req).encode(encoding) 

1080 else: 

1081 raise AssertionError("Unknown task_format") 

1082 attachments.append((task_filename, binary)) 

1083 

1084 self.email = Email( 

1085 from_addr=recipient.email_from, 

1086 # date: automatic 

1087 sender=recipient.email_sender, 

1088 reply_to=recipient.email_reply_to, 

1089 to=recipient.email_to, 

1090 cc=recipient.email_cc, 

1091 bcc=recipient.email_bcc, 

1092 subject=recipient.get_email_subject(req, task), 

1093 body=recipient.get_email_body(req, task), 

1094 content_type=( 

1095 CONTENT_TYPE_HTML if recipient.email_body_as_html 

1096 else CONTENT_TYPE_TEXT 

1097 ), 

1098 charset=encoding, 

1099 attachments_binary=attachments, 

1100 save_msg_string=recipient.email_keep_message, 

1101 ) 

1102 self.email.send( 

1103 host=recipient.email_host, 

1104 username=recipient.email_host_username, 

1105 password=recipient.email_host_password, 

1106 port=recipient.email_port, 

1107 use_tls=recipient.email_use_tls, 

1108 ) 

1109 if self.email.sent: 

1110 exported_task.succeed() 

1111 else: 

1112 exported_task.abort("Failed to send e-mail") 

1113 

1114 

1115# ============================================================================= 

1116# REDCap export 

1117# ============================================================================= 

1118 

1119class ExportedTaskRedcap(Base): 

1120 """ 

1121 Represents an individual REDCap export. 

1122 """ 

1123 __tablename__ = "_exported_task_redcap" 

1124 

1125 id = Column( 

1126 "id", Integer, primary_key=True, autoincrement=True, 

1127 comment="Arbitrary primary key" 

1128 ) 

1129 exported_task_id = Column( 

1130 "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), 

1131 nullable=False, 

1132 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}" 

1133 ) 

1134 

1135 exported_task = relationship(ExportedTask) 

1136 

1137 # We store these just as an audit trail 

1138 redcap_record_id = Column( 

1139 "redcap_record_id", UnicodeText, 

1140 comment=("ID of the (patient) record on the REDCap instance where " 

1141 "this task has been exported") 

1142 ) 

1143 

1144 redcap_instrument_name = Column( 

1145 "redcap_instrument_name", UnicodeText, 

1146 comment=("The name of the REDCap instrument name (form) where this " 

1147 "task has been exported") 

1148 ) 

1149 

1150 redcap_instance_id = Column( 

1151 "redcap_instance_id", Integer, 

1152 comment=("1-based index of this particular task within the patient " 

1153 "record. Increments on every repeat attempt.") 

1154 ) 

1155 

1156 def __init__(self, exported_task: ExportedTask = None) -> None: 

1157 """ 

1158 Args: 

1159 exported_task: :class:`ExportedTask` object 

1160 """ 

1161 self.exported_task = exported_task 

1162 

1163 def export_task(self, req: "CamcopsRequest") -> None: 

1164 """ 

1165 Exports the task to REDCap. 

1166 

1167 Args: 

1168 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1169 """ 

1170 exported_task = self.exported_task 

1171 exporter = RedcapTaskExporter() 

1172 

1173 try: 

1174 exporter.export_task(req, self) 

1175 exported_task.succeed() 

1176 except RedcapExportException as e: 

1177 exported_task.abort(str(e)) 

1178 

1179 

1180# ============================================================================= 

1181# FHIR export 

1182# ============================================================================= 

1183 

1184class ExportedTaskFhir(Base): 

1185 """ 

1186 Represents an individual FHIR export. 

1187 """ 

1188 __tablename__ = "_exported_task_fhir" 

1189 

1190 id = Column( 

1191 "id", Integer, primary_key=True, autoincrement=True, 

1192 comment="Arbitrary primary key" 

1193 ) 

1194 

1195 exported_task_id = Column( 

1196 "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), 

1197 nullable=False, 

1198 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}" 

1199 ) 

1200 

1201 exported_task = relationship(ExportedTask) 

1202 

1203 entries = relationship("ExportedTaskFhirEntry") 

1204 

1205 def __init__(self, exported_task: ExportedTask = None) -> None: 

1206 """ 

1207 Args: 

1208 exported_task: :class:`ExportedTask` object 

1209 """ 

1210 self.exported_task = exported_task 

1211 

1212 def export_task(self, req: "CamcopsRequest") -> None: 

1213 """ 

1214 Exports the task to FHIR. 

1215 

1216 Args: 

1217 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1218 """ 

1219 exported_task = self.exported_task 

1220 

1221 try: 

1222 exporter = FhirTaskExporter(req, self) 

1223 exporter.export_task() 

1224 exported_task.succeed() 

1225 except FhirExportException as e: 

1226 exported_task.abort(str(e)) 

1227 

1228 

1229class ExportedTaskFhirEntry(Base): 

1230 """ 

1231 Details of Patients, Questionnaires, QuestionnaireResponses exported to 

1232 FHIR server for a single task. 

1233 """ 

1234 __tablename__ = "_exported_task_fhir_entry" 

1235 

1236 id = Column( 

1237 "id", Integer, primary_key=True, autoincrement=True, 

1238 comment="Arbitrary primary key" 

1239 ) 

1240 

1241 exported_task_fhir_id = Column( 

1242 "exported_task_fhir_id", Integer, ForeignKey(ExportedTaskFhir.id), 

1243 nullable=False, 

1244 comment="FK to {}.{}".format(ExportedTaskFhir.__tablename__, 

1245 ExportedTaskFhir.id.name) 

1246 ) 

1247 

1248 etag = Column( 

1249 "etag", UnicodeText, comment="The Etag for the resource (if relevant)" 

1250 ) 

1251 

1252 last_modified = Column( 

1253 "last_modified", DateTime, 

1254 comment="Server's date time modified." 

1255 ) 

1256 

1257 location = Column( 

1258 "location", UnicodeText, 

1259 comment="The location (if the operation returns a location)." 

1260 ) 

1261 

1262 status = Column( 

1263 "status", UnicodeText, 

1264 comment="Status response code (text optional)." 

1265 ) 

1266 

1267 # TODO: outcome?