Coverage for cc_modules/cc_exportmodels.py: 33%
464 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/cc_modules/cc_exportmodels.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Define models for export functions (e.g. HL7, file-based export).**
28"""
30import datetime
31import logging
32import os
33import posixpath
34import socket
35import subprocess
36import sys
37from typing import Any, Generator, List, Optional, Tuple, TYPE_CHECKING
39from cardinal_pythonlib.datetimefunc import (
40 get_now_utc_datetime,
41 get_now_utc_pendulum,
42)
43from cardinal_pythonlib.email.sendmail import (
44 CONTENT_TYPE_HTML,
45 CONTENT_TYPE_TEXT,
46)
47from cardinal_pythonlib.fileops import mkdir_p
48from cardinal_pythonlib.logs import BraceStyleAdapter
49from cardinal_pythonlib.network import ping
50from cardinal_pythonlib.sqlalchemy.list_types import StringListType
51from cardinal_pythonlib.sqlalchemy.orm_query import bool_from_exists_clause
52import hl7
53from pendulum import DateTime as Pendulum
54from sqlalchemy.orm import (
55 Mapped,
56 mapped_column,
57 reconstructor,
58 relationship,
59 Session as SqlASession,
60)
61from sqlalchemy.sql.schema import ForeignKey
62from sqlalchemy.sql.sqltypes import (
63 BigInteger,
64 Text,
65 UnicodeText,
66)
68from camcops_server.cc_modules.cc_constants import (
69 ConfigParamExportRecipient,
70 FileType,
71 UTF8,
72)
73from camcops_server.cc_modules.cc_email import Email
74from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
75from camcops_server.cc_modules.cc_exportrecipientinfo import (
76 ExportTransmissionMethod,
77)
78from camcops_server.cc_modules.cc_fhir import (
79 FhirExportException,
80 FhirTaskExporter,
81)
82from camcops_server.cc_modules.cc_filename import change_filename_ext
83from camcops_server.cc_modules.cc_hl7 import (
84 make_msh_segment,
85 MLLPTimeoutClient,
86 msg_is_successful_ack,
87 SEGMENT_SEPARATOR,
88)
89from camcops_server.cc_modules.cc_redcap import (
90 RedcapExportException,
91 RedcapTaskExporter,
92)
93from camcops_server.cc_modules.cc_sqla_coltypes import (
94 LongText,
95 TableNameColType,
96)
97from camcops_server.cc_modules.cc_sqlalchemy import Base
98from camcops_server.cc_modules.cc_taskcollection import (
99 TaskCollection,
100 TaskSortMethod,
101)
102from camcops_server.cc_modules.cc_taskfactory import (
103 task_factory_no_security_checks,
104)
106if TYPE_CHECKING:
107 from camcops_server.cc_modules.cc_request import CamcopsRequest
108 from camcops_server.cc_modules.cc_task import Task
110log = BraceStyleAdapter(logging.getLogger(__name__))
113# =============================================================================
114# Constants
115# =============================================================================
117DOS_NEWLINE = "\r\n"
120# =============================================================================
121# Create task collections for export
122# =============================================================================
125def get_collection_for_export(
126 req: "CamcopsRequest",
127 recipient: ExportRecipient,
128 via_index: bool = True,
129 debug: bool = False,
130) -> TaskCollection:
131 """
132 Returns an appropriate task collection for this export recipient, namely
133 those tasks that are desired and (in the case of incremental exports)
134 haven't already been sent.
136 "Not already sent" means "not already sent to an export recipient with
137 the same name (even if other aspects of the export recipient have
138 changed)".
140 Args:
141 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
142 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
143 via_index: use the task index (faster)?
144 debug: report details?
146 Returns:
147 a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection`
148 """ # noqa
149 if not via_index:
150 log.debug("Task index disabled for get_collection_for_export()")
151 collection = TaskCollection(
152 req=req,
153 sort_method_by_class=TaskSortMethod.CREATION_DATE_ASC,
154 current_only=True,
155 via_index=via_index,
156 export_recipient=recipient,
157 )
158 if debug:
159 log.debug(
160 "get_collection_for_export(): recipient={!r}, " "collection={!r}",
161 recipient,
162 collection,
163 )
164 return collection
167def gen_exportedtasks(
168 collection: TaskCollection,
169) -> Generator["ExportedTask", None, None]:
170 """
171 Generates task export entries from a collection.
173 Args:
174 collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection`
176 Yields:
177 :class:`ExportedTask` objects
179 """ # noqa
180 dbsession = collection.dbsession
181 recipient = collection.export_recipient
182 assert recipient is not None, "TaskCollection has no export_recipient"
183 for task in collection.gen_tasks_by_class():
184 et = ExportedTask(recipient, task)
185 dbsession.add(et)
186 yield et
189def gen_tasks_having_exportedtasks(
190 collection: TaskCollection,
191) -> Generator["Task", None, None]:
192 """
193 Generates tasks from a collection, creating export logs as we go.
195 Used for database exports.
197 Args:
198 collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection`
200 Yields:
201 :class:`camcops_server.cc_modules.cc_task.Task` objects
203 """ # noqa
204 for et in gen_exportedtasks(collection):
205 yield et.task
206 et.succeed()
209# =============================================================================
210# ExportedTask class
211# =============================================================================
214class ExportedTask(Base):
215 """
216 Class representing an attempt to exported a task (as part of a
217 :class:`ExportRun`) to a specific
218 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`.
219 """
221 __tablename__ = "_exported_tasks"
223 id: Mapped[int] = mapped_column(
224 BigInteger,
225 primary_key=True,
226 autoincrement=True,
227 comment="Arbitrary primary key",
228 )
229 recipient_id: Mapped[int] = mapped_column(
230 BigInteger,
231 ForeignKey(ExportRecipient.id),
232 comment=f"FK to {ExportRecipient.__tablename__}.{ExportRecipient.id.name}", # noqa
233 )
234 basetable: Mapped[str] = mapped_column(
235 TableNameColType,
236 index=True,
237 comment="Base table of task concerned",
238 )
239 task_server_pk: Mapped[int] = mapped_column(
240 index=True,
241 comment="Server PK of task in basetable (_pk field)",
242 )
243 start_at_utc: Mapped[datetime.datetime] = mapped_column(
244 index=True,
245 comment="Time export was started (UTC)",
246 )
247 finish_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column(
248 comment="Time export was finished (UTC)"
249 )
250 success: Mapped[bool] = mapped_column(
251 default=False,
252 comment="Task exported successfully?",
253 )
254 failure_reasons: Mapped[Optional[List[str]]] = mapped_column(
255 "failure_reasons", StringListType, comment="Reasons for failure"
256 )
257 cancelled: Mapped[bool] = mapped_column(
258 default=False,
259 comment="Export subsequently cancelled/invalidated (may trigger resend)", # noqa
260 )
261 cancelled_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column(
262 comment="Time export was cancelled at (UTC)",
263 )
265 recipient = relationship(ExportRecipient)
267 emails = relationship("ExportedTaskEmail", back_populates="exported_task")
268 fhir_exports = relationship(
269 "ExportedTaskFhir", back_populates="exported_task"
270 )
271 filegroups = relationship(
272 "ExportedTaskFileGroup", back_populates="exported_task"
273 )
274 hl7_messages = relationship(
275 "ExportedTaskHL7Message", back_populates="exported_task"
276 )
277 redcap_exports = relationship(
278 "ExportedTaskRedcap", back_populates="exported_task"
279 )
281 def __init__(
282 self,
283 recipient: ExportRecipient = None,
284 task: "Task" = None,
285 basetable: str = None,
286 task_server_pk: int = None,
287 **kwargs: Any,
288 ) -> None:
289 """
290 Can initialize with a task, or a basetable/task_server_pk combination.
292 Args:
293 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
294 task: a :class:`camcops_server.cc_modules.cc_task.Task` object
295 basetable: base table name of the task
296 task_server_pk: server PK of the task
298 (However, we must also support a no-parameter constructor, not least
299 for our :func:`merge_db` function.)
300 """ # noqa
301 super().__init__(**kwargs)
302 self.recipient = recipient
303 self.start_at_utc = get_now_utc_datetime()
304 if task:
305 assert (
306 not basetable
307 ) and task_server_pk is None, (
308 "Task specified; mustn't specify basetable/task_server_pk"
309 )
310 self.basetable = task.tablename
311 self.task_server_pk = task.pk
312 self._task = task
313 else:
314 self.basetable = basetable
315 self.task_server_pk = task_server_pk
316 self._task = None
318 @reconstructor
319 def init_on_load(self) -> None:
320 """
321 Called when SQLAlchemy recreates an object; see
322 https://docs.sqlalchemy.org/en/latest/orm/constructors.html.
323 """
324 self._task = None
326 @property
327 def task(self) -> "Task":
328 """
329 Returns the associated task.
330 """
331 if self._task is None:
332 dbsession = SqlASession.object_session(self)
333 try:
334 self._task = task_factory_no_security_checks(
335 dbsession, self.basetable, self.task_server_pk
336 )
337 except KeyError:
338 log.warning(
339 "Failed to retrieve task for basetable={!r}, " "PK={!r}",
340 self.basetable,
341 self.task_server_pk,
342 )
343 self._task = None
344 return self._task
346 def succeed(self) -> None:
347 """
348 Register success.
349 """
350 self.success = True
351 self.finish()
353 def abort(self, msg: str) -> None:
354 """
355 Record failure, and why.
357 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to
358 functions named ``fail``:
359 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.)
361 Args:
362 msg: why
363 """
364 self.success = False
365 log.error("Task export failed: {}", msg)
366 self._add_failure_reason(msg)
367 self.finish()
369 def _add_failure_reason(self, msg: str) -> None:
370 """
371 Writes to our ``failure_reasons`` list in a way that (a) obviates the
372 need to create an empty list via ``__init__()``, and (b) will
373 definitely mark it as dirty, so it gets saved to the database.
375 See :class:`cardinal_pythonlib.sqlalchemy.list_types.StringListType`.
377 Args:
378 msg: the message
379 """
380 if self.failure_reasons is None:
381 self.failure_reasons = [msg]
382 else:
383 # Do not use .append(); that won't mark the record as dirty.
384 # Don't use "+="; similarly, that calls list.__iadd__(), not
385 # InstrumentedAttribute.__set__().
386 # noinspection PyAugmentAssignment
387 self.failure_reasons = self.failure_reasons + [msg]
389 def finish(self) -> None:
390 """
391 Records the finish time.
392 """
393 self.finish_at_utc = get_now_utc_datetime()
395 def export(self, req: "CamcopsRequest") -> None:
396 """
397 Performs an export of the specific task.
399 Args:
400 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
401 """
402 dbsession = req.dbsession
403 recipient = self.recipient
404 transmission_method = recipient.transmission_method
405 log.info("Exporting task {!r} to recipient {}", self.task, recipient)
407 if transmission_method == ExportTransmissionMethod.EMAIL:
408 email = ExportedTaskEmail(self)
409 dbsession.add(email)
410 email.export_task(req)
412 elif transmission_method == ExportTransmissionMethod.FHIR:
413 efhir = ExportedTaskFhir(self)
414 dbsession.add(efhir)
415 dbsession.flush()
416 efhir.export_task(req)
418 elif transmission_method == ExportTransmissionMethod.FILE:
419 efg = ExportedTaskFileGroup(self)
420 dbsession.add(efg)
421 efg.export_task(req)
423 elif transmission_method == ExportTransmissionMethod.HL7:
424 ehl7 = ExportedTaskHL7Message(self)
425 if ehl7.valid():
426 dbsession.add(ehl7)
427 ehl7.export_task(req)
428 else:
429 self.abort("Task not valid for HL7 export")
431 elif transmission_method == ExportTransmissionMethod.REDCAP:
432 eredcap = ExportedTaskRedcap(self)
433 dbsession.add(eredcap)
434 eredcap.export_task(req)
436 else:
437 raise AssertionError("Bug: bad transmission_method")
439 @property
440 def filegroup(self) -> "ExportedTaskFileGroup":
441 """
442 Returns a :class:`ExportedTaskFileGroup`, creating it if necessary.
443 """
444 if self.filegroups:
445 # noinspection PyUnresolvedReferences
446 filegroup = self.filegroups[0] # type: ExportedTaskFileGroup
447 else:
448 filegroup = ExportedTaskFileGroup(self)
449 # noinspection PyUnresolvedReferences
450 self.filegroups.append(filegroup)
451 return filegroup
453 def export_file(
454 self,
455 filename: str,
456 text: str = None,
457 binary: bytes = None,
458 text_encoding: str = UTF8,
459 ) -> bool:
460 """
461 Exports a file.
463 Args:
464 filename:
465 text: text contents (specify this XOR ``binary``)
466 binary: binary contents (specify this XOR ``text``)
467 text_encoding: encoding to use when writing text
469 Returns: was it exported?
470 """
471 filegroup = self.filegroup
472 return filegroup.export_file(
473 filename=filename,
474 text=text,
475 binary=binary,
476 text_encoding=text_encoding,
477 )
479 def cancel(self) -> None:
480 """
481 Marks the task export as cancelled/invalidated.
483 May trigger a resend (which is the point).
484 """
485 self.cancelled = True
486 self.cancelled_at_utc = get_now_utc_datetime()
488 @classmethod
489 def task_already_exported(
490 cls,
491 dbsession: SqlASession,
492 recipient_name: str,
493 basetable: str,
494 task_pk: int,
495 ) -> bool:
496 """
497 Has the specified task already been successfully exported?
499 Args:
500 dbsession: a :class:`sqlalchemy.orm.session.Session`
501 recipient_name:
502 basetable: name of the task's base table
503 task_pk: server PK of the task
505 Returns:
506 does a successful export record exist for this task?
508 """
509 exists_q = (
510 dbsession.query(cls)
511 .join(cls.recipient)
512 .filter(ExportRecipient.recipient_name == recipient_name)
513 .filter(cls.basetable == basetable)
514 .filter(cls.task_server_pk == task_pk)
515 .filter(cls.success == True) # noqa: E712
516 .filter(cls.cancelled == False) # noqa: E712
517 .exists()
518 )
519 return bool_from_exists_clause(dbsession, exists_q)
522# =============================================================================
523# HL7 export
524# =============================================================================
527class ExportedTaskHL7Message(Base):
528 """
529 Represents an individual HL7 message.
530 """
532 __tablename__ = "_exported_task_hl7msg"
534 id: Mapped[int] = mapped_column(
535 BigInteger,
536 primary_key=True,
537 autoincrement=True,
538 comment="Arbitrary primary key",
539 )
540 exported_task_id: Mapped[int] = mapped_column(
541 BigInteger,
542 ForeignKey(ExportedTask.id),
543 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}",
544 )
545 sent_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column(
546 comment="Time message was sent at (UTC)"
547 )
548 reply_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column(
549 comment="Time message was replied to (UTC)"
550 )
551 success: Mapped[Optional[bool]] = mapped_column(
552 comment="Message sent successfully and acknowledged by HL7 server",
553 )
554 failure_reason: Mapped[Optional[str]] = mapped_column(
555 Text, comment="Reason for failure"
556 )
557 message: Mapped[Optional[str]] = mapped_column(
558 LongText, comment="Message body, if kept"
559 )
560 reply: Mapped[Optional[str]] = mapped_column(
561 Text, comment="Server's reply, if kept"
562 )
564 exported_task = relationship(ExportedTask)
566 def __init__(
567 self, exported_task: ExportedTask = None, **kwargs: Any
568 ) -> None:
569 """
570 Must support parameter-free construction, not least for
571 :func:`merge_db`.
572 """
573 super().__init__(**kwargs)
574 self.exported_task = exported_task
576 self._hl7_msg = None # type: Optional[hl7.Message]
578 @reconstructor
579 def init_on_load(self) -> None:
580 """
581 Called when SQLAlchemy recreates an object; see
582 https://docs.sqlalchemy.org/en/latest/orm/constructors.html.
583 """
584 self._hl7_msg = None
586 @staticmethod
587 def task_acceptable_for_hl7(
588 recipient: ExportRecipient, task: "Task"
589 ) -> bool:
590 """
591 Is the task valid for HL7 export. (For example, anonymous tasks and
592 tasks missing key ID information may not be.)
594 Args:
595 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
596 task: a :class:`camcops_server.cc_modules.cc_task.Task` object
598 Returns:
599 bool: valid?
601 """ # noqa
602 if not task:
603 return False
604 if task.is_anonymous:
605 return False # Cannot send anonymous tasks via HL7
606 patient = task.patient
607 if not patient:
608 return False
609 if not recipient.primary_idnum:
610 return False # required for HL7
611 if not patient.has_idnum_type(recipient.primary_idnum):
612 return False
613 return True
615 def valid(self) -> bool:
616 """
617 Checks for internal validity; returns a bool.
618 """
619 exported_task = self.exported_task
620 task = exported_task.task
621 recipient = exported_task.recipient
622 return self.task_acceptable_for_hl7(recipient, task)
624 def succeed(self, now: Pendulum = None) -> None:
625 """
626 Record that we succeeded, and so did our associated task export.
627 """
628 now = now or get_now_utc_datetime()
629 self.success = True
630 self.sent_at_utc = now
631 self.exported_task.succeed()
633 def abort(self, msg: str, diverted_not_sent: bool = False) -> None:
634 """
635 Record that we failed, and so did our associated task export.
637 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to
638 functions named ``fail``:
639 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.)
641 Args:
642 msg: reason for failure
643 diverted_not_sent: deliberately diverted (and not counted as sent)
644 rather than a sending failure?
645 """
646 self.success = False
647 self.failure_reason = msg
648 self.exported_task.abort(
649 "HL7 message deliberately not sent; diverted to file"
650 if diverted_not_sent
651 else "HL7 sending failed"
652 )
654 def export_task(self, req: "CamcopsRequest") -> None:
655 """
656 Exports the task itself to an HL7 message.
658 Args:
659 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
660 """
661 if not self.valid():
662 self.abort(
663 "Unsuitable for HL7; should have been filtered out earlier"
664 )
665 return
666 self.make_hl7_message(req)
667 recipient = self.exported_task.recipient
668 if recipient.hl7_debug_divert_to_file:
669 self.divert_to_file(req)
670 else:
671 # Proper HL7 message
672 self.transmit_hl7()
674 def divert_to_file(self, req: "CamcopsRequest") -> None:
675 """
676 Write an HL7 message to a file. For debugging.
678 Args:
679 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
680 """
681 exported_task = self.exported_task
682 recipient = exported_task.recipient
683 filename = recipient.get_filename(
684 req, exported_task.task, override_task_format="hl7"
685 )
686 now_utc = get_now_utc_pendulum()
687 log.info("Diverting HL7 message to file {!r}", filename)
688 written = exported_task.export_file(
689 filename=filename, text=str(self._hl7_msg)
690 )
691 if not written:
692 return
694 if recipient.hl7_debug_treat_diverted_as_sent:
695 self.sent_at_utc = now_utc
696 self.succeed(now_utc)
697 else:
698 self.abort(
699 "Exported to file as requested but not sent via HL7",
700 diverted_not_sent=True,
701 )
703 def make_hl7_message(self, req: "CamcopsRequest") -> None:
704 """
705 Makes an HL7 message and stores it in ``self._hl7_msg``.
707 May also store it in ``self.message`` (which is saved to the database),
708 if we're saving HL7 messages.
710 See
712 - https://python-hl7.readthedocs.org/en/latest/index.html
713 """
714 task = self.exported_task.task
715 recipient = self.exported_task.recipient
717 # ---------------------------------------------------------------------
718 # Parts
719 # ---------------------------------------------------------------------
720 msh_segment = make_msh_segment(
721 message_datetime=req.now, message_control_id=str(self.id)
722 )
723 pid_segment = task.get_patient_hl7_pid_segment(req, recipient)
724 other_segments = task.get_hl7_data_segments(req, recipient)
726 # ---------------------------------------------------------------------
727 # Whole message
728 # ---------------------------------------------------------------------
729 segments = [msh_segment, pid_segment] + other_segments
730 self._hl7_msg = hl7.Message(SEGMENT_SEPARATOR, segments)
731 if recipient.hl7_keep_message:
732 self.message = str(self._hl7_msg)
734 def transmit_hl7(self) -> None:
735 """
736 Sends the HL7 message over TCP/IP.
738 - Default MLLP/HL7 port is 2575
739 - MLLP = minimum lower layer protocol
741 - https://www.cleo.com/support/byproduct/lexicom/usersguide/mllp_configuration.htm
742 - https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml?search=hl7
743 - Essentially just a TCP socket with a minimal wrapper:
744 https://stackoverflow.com/questions/11126918
746 - https://python-hl7.readthedocs.org/en/latest/api.html; however,
747 we've modified that
748 """ # noqa
749 recipient = self.exported_task.recipient
751 if recipient.hl7_ping_first:
752 pinged = self.ping_hl7_server(recipient)
753 if not pinged:
754 self.abort("Could not ping HL7 host")
755 return
757 try:
758 log.info(
759 "Sending HL7 message to {}:{}",
760 recipient.hl7_host,
761 recipient.hl7_port,
762 )
763 with MLLPTimeoutClient(
764 recipient.hl7_host,
765 recipient.hl7_port,
766 recipient.hl7_network_timeout_ms,
767 ) as client:
768 server_replied, reply = client.send_message(self._hl7_msg)
769 except socket.timeout:
770 self.abort("Failed to send message via MLLP: timeout")
771 return
772 except Exception as e:
773 self.abort(f"Failed to send message via MLLP: {e}")
774 return
776 if not server_replied:
777 self.abort("No response from server")
778 return
780 self.reply_at_utc = get_now_utc_datetime()
781 if recipient.hl7_keep_reply:
782 self.reply = reply
784 try:
785 replymsg = hl7.parse(reply)
786 except Exception as e:
787 self.abort(f"Malformed reply: {e}")
788 return
790 success, failure_reason = msg_is_successful_ack(replymsg)
791 if success:
792 self.succeed()
793 else:
794 self.abort(failure_reason)
796 @staticmethod
797 def ping_hl7_server(recipient: ExportRecipient) -> bool:
798 # noinspection HttpUrlsUsage
799 """
800 Performs a TCP/IP ping on our HL7 server; returns success. If we've
801 already pinged successfully during this run, don't bother doing it
802 again.
804 (No HL7 PING method yet. Proposal is
805 http://hl7tsc.org/wiki/index.php?title=FTSD-ConCalls-20081028
806 So use TCP/IP ping.)
808 Args:
809 recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
811 Returns:
812 bool: success
814 """ # noqa
815 timeout_s = min(recipient.hl7_network_timeout_ms // 1000, 1)
816 if ping(hostname=recipient.hl7_host, timeout_s=timeout_s):
817 return True
818 else:
819 log.error("Failed to ping {!r}", recipient.hl7_host)
820 return False
823# =============================================================================
824# File export
825# =============================================================================
828class ExportedTaskFileGroup(Base):
829 """
830 Represents a small set of files exported in relation to a single task.
831 """
833 __tablename__ = "_exported_task_filegroup"
835 id: Mapped[int] = mapped_column(
836 BigInteger,
837 primary_key=True,
838 autoincrement=True,
839 comment="Arbitrary primary key",
840 )
841 exported_task_id: Mapped[int] = mapped_column(
842 BigInteger,
843 ForeignKey(ExportedTask.id),
844 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}",
845 )
846 filenames: Mapped[Optional[List[str]]] = mapped_column(
847 StringListType, comment="List of filenames exported"
848 )
849 script_called: Mapped[bool] = mapped_column(
850 default=False,
851 comment=(
852 f"Was the {ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} "
853 f"script called?"
854 ),
855 )
856 script_retcode: Mapped[Optional[int]] = mapped_column(
857 comment=(
858 f"Return code from the "
859 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script"
860 ),
861 )
862 script_stdout: Mapped[Optional[str]] = mapped_column(
863 UnicodeText,
864 comment=(
865 f"stdout from the "
866 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script"
867 ),
868 )
869 script_stderr: Mapped[Optional[str]] = mapped_column(
870 UnicodeText,
871 comment=(
872 f"stderr from the "
873 f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script"
874 ),
875 )
877 exported_task = relationship(ExportedTask)
879 def __init__(
880 self, exported_task: ExportedTask = None, **kwargs: Any
881 ) -> None:
882 """
883 Args:
884 exported_task: :class:`ExportedTask` object
885 """
886 super().__init__(**kwargs)
887 self.exported_task = exported_task
889 def export_file(
890 self,
891 filename: str,
892 text: str = None,
893 binary: bytes = None,
894 text_encoding: str = UTF8,
895 ) -> bool:
896 """
897 Exports the file.
899 Args:
900 filename:
901 text: text contents (specify this XOR ``binary``)
902 binary: binary contents (specify this XOR ``text``)
903 text_encoding: encoding to use when writing text
905 Returns:
906 bool: was it exported?
907 """
908 assert bool(text) != bool(binary), "Specify text XOR binary"
909 exported_task = self.exported_task
910 filename = os.path.abspath(filename)
911 directory = os.path.dirname(filename)
912 recipient = exported_task.recipient
914 if not recipient.file_overwrite_files and os.path.isfile(filename):
915 self.abort(f"File already exists: {filename!r}")
916 return False
918 if recipient.file_make_directory:
919 try:
920 mkdir_p(directory)
921 except Exception as e:
922 self.abort(f"Couldn't make directory {directory!r}: {e}")
923 return False
925 try:
926 log.debug("Writing to {!r}", filename)
927 if text:
928 with open(filename, mode="w", encoding=text_encoding) as f:
929 f.write(text)
930 else:
931 with open(filename, mode="wb") as f:
932 f.write(binary)
933 except Exception as e:
934 self.abort(f"Failed to open or write file {filename!r}: {e}")
935 return False
937 self.note_exported_file(filename)
938 return True
940 def note_exported_file(self, *filenames: str) -> None:
941 """
942 Records a filename that has been exported, or several.
944 Args:
945 *filenames: filenames
946 """
947 if self.filenames is None:
948 self.filenames = list(filenames)
949 else:
950 # See ExportedTask._add_failure_reason() above:
951 # noinspection PyAugmentAssignment,PyTypeChecker
952 self.filenames = self.filenames + list(filenames)
954 def export_task(self, req: "CamcopsRequest") -> None:
955 """
956 Exports the task itself to a file.
958 Args:
959 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
960 """
961 exported_task = self.exported_task
962 task = exported_task.task
963 recipient = exported_task.recipient
964 task_format = recipient.task_format
965 task_filename = recipient.get_filename(req, task)
966 rio_metadata_filename = change_filename_ext(
967 task_filename, ".metadata"
968 ).replace(" ", "")
969 # ... in case we use it. No spaces in its filename.
971 # Before we calculate the PDF, etc., we can pre-check for existing
972 # files.
973 if not recipient.file_overwrite_files:
974 target_filenames = [task_filename]
975 if recipient.file_export_rio_metadata:
976 target_filenames.append(rio_metadata_filename)
977 for fname in target_filenames:
978 if os.path.isfile(os.path.abspath(fname)):
979 self.abort(f"File already exists: {fname!r}")
980 return
982 # Export task
983 if task_format == FileType.PDF:
984 binary = task.get_pdf(req)
985 text = None
986 elif task_format == FileType.HTML:
987 binary = None
988 text = task.get_html(req)
989 elif task_format == FileType.XML:
990 binary = None
991 text = task.get_xml(req)
992 else:
993 raise AssertionError("Unknown task_format")
994 written = self.export_file(
995 task_filename, text=text, binary=binary, text_encoding=UTF8
996 )
997 if not written:
998 return
1000 # RiO metadata too?
1001 if recipient.file_export_rio_metadata:
1003 metadata = task.get_rio_metadata(
1004 req,
1005 recipient.rio_idnum,
1006 recipient.rio_uploading_user,
1007 recipient.rio_document_type,
1008 )
1009 # We're going to write in binary mode, to get the newlines right.
1010 # One way is:
1011 # with codecs.open(filename, mode="w", encoding="ascii") as f:
1012 # f.write(metadata.replace("\n", DOS_NEWLINE))
1013 # Here's another.
1014 metadata = metadata.replace("\n", DOS_NEWLINE)
1015 # ... Servelec say CR = "\r", but DOS is \r\n.
1016 metadata_binary = metadata.encode("ascii")
1017 # UTF-8 is NOT supported by RiO for metadata.
1018 written_metadata = self.export_file(
1019 rio_metadata_filename, binary=metadata_binary
1020 )
1021 if not written_metadata:
1022 return
1024 self.finish_run_script_if_necessary()
1026 def succeed(self) -> None:
1027 """
1028 Register success.
1029 """
1030 self.exported_task.succeed()
1032 def abort(self, msg: str) -> None:
1033 """
1034 Record failure, and why.
1036 (Called ``abort`` not ``fail`` because PyCharm has a bug relating to
1037 functions named ``fail``:
1038 https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.)
1040 Args:
1041 msg: why
1042 """
1043 self.exported_task.abort(msg)
1045 def finish_run_script_if_necessary(self) -> None:
1046 """
1047 Completes the file export by running the external script, if required.
1048 """
1049 recipient = self.exported_task.recipient
1050 if self.filenames and recipient.file_script_after_export:
1051 args = [recipient.file_script_after_export] + self.filenames
1052 try:
1053 encoding = sys.getdefaultencoding()
1054 p = subprocess.Popen(
1055 args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
1056 )
1057 out, err = p.communicate()
1058 self.script_called = True
1059 self.script_stdout = out.decode(encoding)
1060 self.script_stderr = err.decode(encoding)
1061 self.script_retcode = p.returncode
1062 except Exception as e:
1063 self.script_called = False
1064 self.script_stdout = ""
1065 self.script_stderr = str(e)
1066 self.abort("Failed to run script")
1067 return
1068 self.succeed()
1071# =============================================================================
1072# E-mail export
1073# =============================================================================
1076class ExportedTaskEmail(Base):
1077 """
1078 Represents an individual email export.
1079 """
1081 __tablename__ = "_exported_task_email"
1083 id: Mapped[int] = mapped_column(
1084 BigInteger,
1085 primary_key=True,
1086 autoincrement=True,
1087 comment="Arbitrary primary key",
1088 )
1089 exported_task_id: Mapped[int] = mapped_column(
1090 BigInteger,
1091 ForeignKey(ExportedTask.id),
1092 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}",
1093 )
1094 email_id: Mapped[Optional[int]] = mapped_column(
1095 BigInteger,
1096 ForeignKey(Email.id),
1097 comment=f"FK to {Email.__tablename__}.{Email.id.name}",
1098 )
1100 exported_task = relationship(ExportedTask)
1101 email = relationship(Email)
1103 def __init__(
1104 self, exported_task: ExportedTask = None, **kwargs: Any
1105 ) -> None:
1106 """
1107 Args:
1108 exported_task: :class:`ExportedTask` object
1109 """
1110 super().__init__(**kwargs)
1111 self.exported_task = exported_task
1113 def export_task(self, req: "CamcopsRequest") -> None:
1114 """
1115 Exports the task itself to an email.
1117 Args:
1118 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1119 """
1120 exported_task = self.exported_task
1121 task = exported_task.task
1122 recipient = exported_task.recipient
1123 task_format = recipient.task_format
1124 task_filename = os.path.basename(recipient.get_filename(req, task))
1125 # ... we don't want a full path for e-mail!
1126 encoding = "utf8"
1128 # Export task
1129 attachments = [] # type: List[Tuple[str, bytes]]
1130 if task_format == FileType.PDF:
1131 binary = task.get_pdf(req)
1132 elif task_format == FileType.HTML:
1133 binary = task.get_html(req).encode(encoding)
1134 elif task_format == FileType.XML:
1135 binary = task.get_xml(req).encode(encoding)
1136 else:
1137 raise AssertionError("Unknown task_format")
1138 attachments.append((task_filename, binary))
1140 self.email = Email(
1141 from_addr=recipient.email_from,
1142 # date: automatic
1143 sender=recipient.email_sender,
1144 reply_to=recipient.email_reply_to,
1145 to=recipient.email_to,
1146 cc=recipient.email_cc,
1147 bcc=recipient.email_bcc,
1148 subject=recipient.get_email_subject(req, task),
1149 body=recipient.get_email_body(req, task),
1150 content_type=(
1151 CONTENT_TYPE_HTML
1152 if recipient.email_body_as_html
1153 else CONTENT_TYPE_TEXT
1154 ),
1155 charset=encoding,
1156 attachments_binary=attachments,
1157 save_msg_string=recipient.email_keep_message,
1158 )
1159 self.email.send(
1160 host=recipient.email_host,
1161 username=recipient.email_host_username,
1162 password=recipient.email_host_password,
1163 port=recipient.email_port,
1164 use_tls=recipient.email_use_tls,
1165 )
1166 if self.email.sent:
1167 exported_task.succeed()
1168 else:
1169 exported_task.abort("Failed to send e-mail")
1172# =============================================================================
1173# REDCap export
1174# =============================================================================
1177class ExportedTaskRedcap(Base):
1178 """
1179 Represents an individual REDCap export.
1180 """
1182 __tablename__ = "_exported_task_redcap"
1184 id: Mapped[int] = mapped_column(
1185 primary_key=True,
1186 autoincrement=True,
1187 comment="Arbitrary primary key",
1188 )
1189 exported_task_id: Mapped[int] = mapped_column(
1190 BigInteger,
1191 ForeignKey(ExportedTask.id),
1192 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}",
1193 )
1195 exported_task = relationship(ExportedTask)
1197 # We store these just as an audit trail
1198 redcap_record_id: Mapped[Optional[int]] = mapped_column(
1199 UnicodeText,
1200 comment=(
1201 "ID of the (patient) record on the REDCap instance where "
1202 "this task has been exported"
1203 ),
1204 )
1206 redcap_instrument_name: Mapped[Optional[str]] = mapped_column(
1207 UnicodeText,
1208 comment=(
1209 "The name of the REDCap instrument name (form) where this "
1210 "task has been exported"
1211 ),
1212 )
1214 redcap_instance_id: Mapped[Optional[int]] = mapped_column(
1215 comment=(
1216 "1-based index of this particular task within the patient "
1217 "record. Increments on every repeat attempt."
1218 ),
1219 )
1221 def __init__(
1222 self, exported_task: ExportedTask = None, **kwargs: Any
1223 ) -> None:
1224 """
1225 Args:
1226 exported_task: :class:`ExportedTask` object
1227 """
1228 super().__init__(**kwargs)
1229 self.exported_task = exported_task
1231 def export_task(self, req: "CamcopsRequest") -> None:
1232 """
1233 Exports the task to REDCap.
1235 Args:
1236 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1237 """
1238 exported_task = self.exported_task
1239 exporter = RedcapTaskExporter()
1241 try:
1242 exporter.export_task(req, self)
1243 exported_task.succeed()
1244 except RedcapExportException as e:
1245 exported_task.abort(str(e))
1248# =============================================================================
1249# FHIR export
1250# =============================================================================
1253class ExportedTaskFhir(Base):
1254 """
1255 Represents an individual FHIR export.
1256 """
1258 __tablename__ = "_exported_task_fhir"
1260 id: Mapped[int] = mapped_column(
1261 primary_key=True,
1262 autoincrement=True,
1263 comment="Arbitrary primary key",
1264 )
1266 exported_task_id: Mapped[int] = mapped_column(
1267 BigInteger,
1268 ForeignKey(ExportedTask.id),
1269 comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}",
1270 )
1272 exported_task = relationship(ExportedTask)
1274 entries = relationship(
1275 "ExportedTaskFhirEntry", back_populates="exported_task_fhir"
1276 )
1278 def __init__(
1279 self, exported_task: ExportedTask = None, **kwargs: Any
1280 ) -> None:
1281 """
1282 Args:
1283 exported_task: :class:`ExportedTask` object
1284 """
1285 super().__init__(**kwargs)
1286 self.exported_task = exported_task
1288 def export_task(self, req: "CamcopsRequest") -> None:
1289 """
1290 Exports the task to FHIR.
1292 Args:
1293 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1294 """
1295 exported_task = self.exported_task
1297 try:
1298 exporter = FhirTaskExporter(req, self)
1299 exporter.export_task()
1300 exported_task.succeed()
1301 except FhirExportException as e:
1302 exported_task.abort(str(e))
1305class ExportedTaskFhirEntry(Base):
1306 """
1307 Details of Patients, Questionnaires, QuestionnaireResponses exported to
1308 a FHIR server for a single task.
1309 """
1311 __tablename__ = "_exported_task_fhir_entry"
1313 id: Mapped[int] = mapped_column(
1314 primary_key=True,
1315 autoincrement=True,
1316 comment="Arbitrary primary key",
1317 )
1319 exported_task_fhir_id: Mapped[int] = mapped_column(
1320 ForeignKey(ExportedTaskFhir.id),
1321 comment="FK to {}.{}".format(
1322 ExportedTaskFhir.__tablename__, ExportedTaskFhir.id.name
1323 ),
1324 )
1326 etag: Mapped[Optional[str]] = mapped_column(
1327 UnicodeText, comment="The ETag for the resource (if relevant)"
1328 )
1330 last_modified: Mapped[Optional[datetime.datetime]] = mapped_column(
1331 comment="Server's date/time modified."
1332 )
1334 location: Mapped[Optional[str]] = mapped_column(
1335 UnicodeText,
1336 comment="The location (if the operation returns a location).",
1337 )
1339 status: Mapped[Optional[str]] = mapped_column(
1340 UnicodeText, comment="Status response code (text optional)."
1341 )
1343 # TODO: outcome?
1345 exported_task_fhir = relationship(ExportedTaskFhir)
1347 @property
1348 def location_url(self) -> str:
1349 """
1350 Puts the FHIR server API URL together with the returned location, so
1351 we can hyperlink to the resource.
1352 """
1353 if not self.location:
1354 return ""
1355 try:
1356 api_url = (
1357 self.exported_task_fhir.exported_task.recipient.fhir_api_url
1358 )
1359 except AttributeError:
1360 return ""
1361 # Avoid urllib.parse.urljoin; it does complex (and for our purposes
1362 # wrong) things. See
1363 # https://stackoverflow.com/questions/10893374/python-confusions-with-urljoin
1364 return posixpath.join(api_url, self.location)