Coverage for cc_modules/cc_hl7.py: 13%
290 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1# noinspection HttpUrlsUsage
2"""
3camcops_server/cc_modules/cc_hl7.py
5===============================================================================
7 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
8 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27**Core HL7 functions, e.g. to build HL7 v2 messages.**
29General HL7 sources:
31- https://python-hl7.readthedocs.org/en/latest/
32- http://www.interfaceware.com/manual/v3gen_python_library_details.html
33- http://www.interfaceware.com/hl7_video_vault.html#how
34- http://www.interfaceware.com/hl7-standard/hl7-segments.html
35- https://www.hl7.org/special/committees/vocab/v26_appendix_a.pdf
36- https://www.ncbi.nlm.nih.gov/pmc/articles/PMC130066/
38To consider
40- batched messages (HL7 batching protocol);
41 https://docs.oracle.com/cd/E23943_01/user.1111/e23486/app_hl7batching.htm
42- note: DG1 segment = diagnosis
44Basic HL7 message structure:
46- can package into HL7 2.X message as encapsulated PDF;
47 https://www.hl7standards.com/blog/2007/11/27/pdf-attachment-in-hl7-message/
48- message ORU^R01
49 https://www.corepointhealth.com/resource-center/hl7-resources/hl7-messages
50- MESSAGES: http://www.interfaceware.com/hl7-standard/hl7-messages.html
51- OBX segment = observation/result segment;
52 https://www.corepointhealth.com/resource-center/hl7-resources/hl7-obx-segment;
53 http://www.interfaceware.com/hl7-standard/hl7-segment-OBX.html
54- SEGMENTS:
55 https://www.corepointhealth.com/resource-center/hl7-resources/hl7-segments
56- ED field (= encapsulated data);
57 http://www.interfaceware.com/hl7-standard/hl7-fields.html
58- base-64 encoding
60We can then add an option for structure (XML), HTML, PDF export.
62"""
64import base64
65import logging
66import socket
67from types import TracebackType
68from typing import List, Optional, Tuple, Type, TYPE_CHECKING, Union
70from cardinal_pythonlib.datetimefunc import format_datetime
71from cardinal_pythonlib.logs import BraceStyleAdapter
72import hl7
73from pendulum import Date, DateTime as Pendulum
75from camcops_server.cc_modules.cc_constants import DateFormat, FileType
76from camcops_server.cc_modules.cc_simpleobjects import HL7PatientIdentifier
78if TYPE_CHECKING:
79 from camcops_server.cc_modules.cc_request import CamcopsRequest
80 from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions
81 from camcops_server.cc_modules.cc_task import Task
83log = BraceStyleAdapter(logging.getLogger(__name__))
86# =============================================================================
87# Constants
88# =============================================================================
90# STRUCTURE OF HL7 MESSAGES
91# MESSAGE = list of segments, separated by carriage returns
92SEGMENT_SEPARATOR = "\r"
93# SEGMENT = list of fields (= composites), separated by pipes
94FIELD_SEPARATOR = "|"
95# FIELD (= COMPOSITE) = string, or list of components separated by carets
96COMPONENT_SEPARATOR = "^"
97# Component = string, or lists of subcomponents separated by ampersands
98SUBCOMPONENT_SEPARATOR = "&"
99# Subcomponents must be primitive data types (i.e. strings).
100# ... http://www.interfaceware.com/blog/hl7-composites/
102REPETITION_SEPARATOR = "~"
103ESCAPE_CHARACTER = "\\"
105# Fields are specified in terms of DATA TYPES:
106# http://www.corepointhealth.com/resource-center/hl7-resources/hl7-data-types
108# Some of those are COMPOSITE TYPES:
109# http://amisha.pragmaticdata.com/~gunther/oldhtml/composites.html#COMPOSITES
112# =============================================================================
113# HL7 helper functions
114# =============================================================================
117def get_mod11_checkdigit(strnum: str) -> str:
118 # noinspection HttpUrlsUsage
119 """
120 Input: string containing integer. Output: MOD11 check digit (string).
122 See:
124 - http://www.mexi.be/documents/hl7/ch200025.htm
125 - https://stackoverflow.com/questions/7006109
126 - http://www.pgrocer.net/Cis51/mod11.html
127 """
128 total = 0
129 multiplier = 2 # 2 for units digit, increases to 7, then resets to 2
130 try:
131 for i in reversed(range(len(strnum))):
132 total += int(strnum[i]) * multiplier
133 multiplier += 1
134 if multiplier == 8:
135 multiplier = 2
136 c = str(11 - (total % 11))
137 if c == "11":
138 c = "0"
139 elif c == "10":
140 c = "X"
141 return c
142 except (TypeError, ValueError):
143 # garbage in...
144 return ""
147def make_msh_segment(
148 message_datetime: Pendulum, message_control_id: str
149) -> hl7.Segment:
150 """
151 Creates an HL7 message header (MSH) segment.
153 - MSH: https://www.hl7.org/documentcenter/public/wg/conf/HL7MSH.htm
155 - We're making an ORU^R01 message = unsolicited result.
157 - ORU = Observational Report - Unsolicited
158 - ORU^R01 = Unsolicited transmission of an observation message
159 - https://www.corepointhealth.com/resource-center/hl7-resources/hl7-oru-message
160 - https://www.hl7kit.com/joomla/index.php/hl7resources/examples/107-orur01
161 """ # noqa
163 segment_id = "MSH"
164 encoding_characters = (
165 COMPONENT_SEPARATOR
166 + REPETITION_SEPARATOR
167 + ESCAPE_CHARACTER
168 + SUBCOMPONENT_SEPARATOR
169 )
170 sending_application = "CamCOPS"
171 sending_facility = ""
172 receiving_application = ""
173 receiving_facility = ""
174 date_time_of_message = format_datetime(
175 message_datetime, DateFormat.HL7_DATETIME
176 )
177 security = ""
178 message_type = hl7.Field(
179 COMPONENT_SEPARATOR,
180 [
181 "ORU", # message type ID = Observ result/unsolicited
182 "R01", # trigger event ID = ORU/ACK - Unsolicited transmission
183 # of an observation message
184 ],
185 )
186 processing_id = "P" # production (processing mode: current)
187 version_id = "2.3" # HL7 version
188 sequence_number = ""
189 continuation_pointer = ""
190 accept_acknowledgement_type = ""
191 application_acknowledgement_type = "AL" # always
192 country_code = ""
193 character_set = "UNICODE UTF-8"
194 # http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages
195 principal_language_of_message = ""
197 fields = [
198 segment_id,
199 # field separator inserted automatically; HL7 standard considers it a
200 # field but the python-hl7 processor doesn't when it parses
201 encoding_characters,
202 sending_application,
203 sending_facility,
204 receiving_application,
205 receiving_facility,
206 date_time_of_message,
207 security,
208 message_type,
209 message_control_id,
210 processing_id,
211 version_id,
212 sequence_number,
213 continuation_pointer,
214 accept_acknowledgement_type,
215 application_acknowledgement_type,
216 country_code,
217 character_set,
218 principal_language_of_message,
219 ]
220 segment = hl7.Segment(FIELD_SEPARATOR, fields)
221 return segment
224def make_pid_segment(
225 forename: str,
226 surname: str,
227 dob: Date,
228 sex: str,
229 address: str,
230 patient_id_list: List[HL7PatientIdentifier] = None,
231) -> hl7.Segment:
232 """
233 Creates an HL7 patient identification (PID) segment.
235 - https://www.corepointhealth.com/resource-center/hl7-resources/hl7-pid-segment
236 - https://www.hl7.org/documentcenter/public/wg/conf/Msgadt.pdf (s5.4.8)
238 - ID numbers...
239 https://www.cdc.gov/vaccines/programs/iis/technical-guidance/downloads/hl7guide-1-4-2012-08.pdf
240 """ # noqa
242 patient_id_list = patient_id_list or [] # type: List[HL7PatientIdentifier]
244 segment_id = "PID"
245 set_id = ""
247 # External ID
248 patient_external_id = ""
249 # ... this one is deprecated
250 # http://www.j4jayant.com/articles/hl7/16-patient-id
252 # Internal ID
253 internal_id_element_list = []
254 for i in range(len(patient_id_list)):
255 if not patient_id_list[i].pid:
256 continue
257 ptidentifier = patient_id_list[i]
258 pid = ptidentifier.pid
259 check_digit = get_mod11_checkdigit(pid)
260 check_digit_scheme = "M11" # Mod 11 algorithm
261 type_id = patient_id_list[i].id_type
262 assigning_authority = patient_id_list[i].assigning_authority
263 # Now, as per Table 4.6 "Extended composite ID" of
264 # hl7guide-1-4-2012-08.pdf:
265 internal_id_element = hl7.Field(
266 COMPONENT_SEPARATOR,
267 [
268 pid,
269 check_digit,
270 check_digit_scheme,
271 assigning_authority,
272 type_id, # length "2..5" meaning 2-5
273 ],
274 )
275 internal_id_element_list.append(internal_id_element)
276 patient_internal_id = hl7.Field(
277 REPETITION_SEPARATOR, internal_id_element_list
278 )
280 # Alternate ID
281 alternate_patient_id = ""
282 # ... this one is deprecated
283 # http://www.j4jayant.com/articles/hl7/16-patient-id
285 patient_name = hl7.Field(
286 COMPONENT_SEPARATOR,
287 [
288 forename, # surname
289 surname, # forename
290 "", # middle initial/name
291 "", # suffix (e.g. Jr, III)
292 "", # prefix (e.g. Dr)
293 "", # degree (e.g. MD)
294 ],
295 )
296 mothers_maiden_name = ""
297 date_of_birth = format_datetime(dob, DateFormat.HL7_DATE)
298 alias = ""
299 race = ""
300 country_code = ""
301 home_phone_number = ""
302 business_phone_number = ""
303 language = ""
304 marital_status = ""
305 religion = ""
306 account_number = ""
307 social_security_number = ""
308 drivers_license_number = ""
309 mother_identifier = ""
310 ethnic_group = ""
311 birthplace = ""
312 birth_order = ""
313 citizenship = ""
314 veterans_military_status = ""
316 fields = [
317 segment_id,
318 set_id, # PID.1
319 patient_external_id, # PID.2
320 patient_internal_id, # known as "PID-3" or "PID.3"
321 alternate_patient_id, # PID.4
322 patient_name,
323 mothers_maiden_name,
324 date_of_birth,
325 sex,
326 alias,
327 race,
328 address,
329 country_code,
330 home_phone_number,
331 business_phone_number,
332 language,
333 marital_status,
334 religion,
335 account_number,
336 social_security_number,
337 drivers_license_number,
338 mother_identifier,
339 ethnic_group,
340 birthplace,
341 birth_order,
342 citizenship,
343 veterans_military_status,
344 ]
345 segment = hl7.Segment(FIELD_SEPARATOR, fields)
346 return segment
349# noinspection PyUnusedLocal
350def make_obr_segment(task: "Task") -> hl7.Segment:
351 # noinspection HttpUrlsUsage
352 """
353 Creates an HL7 observation request (OBR) segment.
355 - http://hl7reference.com/HL7%20Specifications%20ORM-ORU.PDF
356 - Required in ORU^R01 message:
358 - https://www.corepointhealth.com/resource-center/hl7-resources/hl7-oru-message
359 - https://www.corepointhealth.com/resource-center/hl7-resources/hl7-obr-segment
360 """ # noqa
362 segment_id = "OBR"
363 set_id = "1"
364 placer_order_number = "CamCOPS"
365 filler_order_number = "CamCOPS"
366 universal_service_id = hl7.Field(
367 COMPONENT_SEPARATOR,
368 ["CamCOPS", "CamCOPS psychiatric/cognitive assessment"],
369 )
370 # unused below here, apparently
371 priority = ""
372 requested_date_time = ""
373 observation_date_time = ""
374 observation_end_date_time = ""
375 collection_volume = ""
376 collector_identifier = ""
377 specimen_action_code = ""
378 danger_code = ""
379 relevant_clinical_information = ""
380 specimen_received_date_time = ""
381 ordering_provider = ""
382 order_callback_phone_number = ""
383 placer_field_1 = ""
384 placer_field_2 = ""
385 filler_field_1 = ""
386 filler_field_2 = ""
387 results_report_status_change_date_time = ""
388 charge_to_practice = ""
389 diagnostic_service_section_id = ""
390 result_status = ""
391 parent_result = ""
392 quantity_timing = ""
393 result_copies_to = ""
394 parent = ""
395 transportation_mode = ""
396 reason_for_study = ""
397 principal_result_interpreter = ""
398 assistant_result_interpreter = ""
399 technician = ""
400 transcriptionist = ""
401 scheduled_date_time = ""
402 number_of_sample_containers = ""
403 transport_logistics_of_collected_samples = ""
404 collectors_comment = ""
405 transport_arrangement_responsibility = ""
406 transport_arranged = ""
407 escort_required = ""
408 planned_patient_transport_comment = ""
410 fields = [
411 segment_id,
412 set_id,
413 placer_order_number,
414 filler_order_number,
415 universal_service_id,
416 priority,
417 requested_date_time,
418 observation_date_time,
419 observation_end_date_time,
420 collection_volume,
421 collector_identifier,
422 specimen_action_code,
423 danger_code,
424 relevant_clinical_information,
425 specimen_received_date_time,
426 ordering_provider,
427 order_callback_phone_number,
428 placer_field_1,
429 placer_field_2,
430 filler_field_1,
431 filler_field_2,
432 results_report_status_change_date_time,
433 charge_to_practice,
434 diagnostic_service_section_id,
435 result_status,
436 parent_result,
437 quantity_timing,
438 result_copies_to,
439 parent,
440 transportation_mode,
441 reason_for_study,
442 principal_result_interpreter,
443 assistant_result_interpreter,
444 technician,
445 transcriptionist,
446 scheduled_date_time,
447 number_of_sample_containers,
448 transport_logistics_of_collected_samples,
449 collectors_comment,
450 transport_arrangement_responsibility,
451 transport_arranged,
452 escort_required,
453 planned_patient_transport_comment,
454 ]
455 segment = hl7.Segment(FIELD_SEPARATOR, fields)
456 return segment
459def make_obx_segment(
460 req: "CamcopsRequest",
461 task: "Task",
462 task_format: str,
463 observation_identifier: str,
464 observation_datetime: Pendulum,
465 responsible_observer: str,
466 export_options: "TaskExportOptions",
467) -> hl7.Segment:
468 # noinspection HttpUrlsUsage
469 """
470 Creates an HL7 observation result (OBX) segment.
472 - http://www.hl7standards.com/blog/2006/10/18/how-do-i-send-a-binary-file-inside-of-an-hl7-message
473 - http://www.hl7standards.com/blog/2007/11/27/pdf-attachment-in-hl7-message/
474 - http://www.hl7standards.com/blog/2006/12/01/sending-images-or-formatted-documents-via-hl7-messaging/
475 - https://www.hl7.org/documentcenter/public/wg/ca/HL7ClmAttIG.PDF
476 - type of data:
477 https://www.hl7.org/implement/standards/fhir/v2/0191/index.html
478 - subtype of data:
479 https://www.hl7.org/implement/standards/fhir/v2/0291/index.html
480 """ # noqa
482 segment_id = "OBX"
483 set_id = str(1)
485 source_application = "CamCOPS"
486 if task_format == FileType.PDF:
487 value_type = "ED" # Encapsulated data (ED) field
488 observation_value = hl7.Field(
489 COMPONENT_SEPARATOR,
490 [
491 source_application,
492 "Application", # type of data
493 "PDF", # data subtype
494 "Base64", # base 64 encoding
495 base64.standard_b64encode(task.get_pdf(req)), # data
496 ],
497 )
498 elif task_format == FileType.HTML:
499 value_type = "ED" # Encapsulated data (ED) field
500 observation_value = hl7.Field(
501 COMPONENT_SEPARATOR,
502 [
503 source_application,
504 "TEXT", # type of data
505 "HTML", # data subtype
506 "A", # no encoding (see table 0299), but need to escape
507 escape_hl7_text(task.get_html(req)), # data
508 ],
509 )
510 elif task_format == FileType.XML:
511 value_type = "ED" # Encapsulated data (ED) field
512 observation_value = hl7.Field(
513 COMPONENT_SEPARATOR,
514 [
515 source_application,
516 "TEXT", # type of data
517 "XML", # data subtype
518 "A", # no encoding (see table 0299), but need to escape
519 escape_hl7_text(
520 task.get_xml(
521 req, indent_spaces=0, eol="", options=export_options
522 )
523 ), # data
524 ],
525 )
526 else:
527 raise AssertionError(
528 f"make_obx_segment: invalid task_format: {task_format}"
529 )
531 observation_sub_id = ""
532 units = ""
533 reference_range = ""
534 abnormal_flags = ""
535 probability = ""
536 nature_of_abnormal_test = ""
537 observation_result_status = ""
538 date_of_last_observation_normal_values = ""
539 user_defined_access_checks = ""
540 date_and_time_of_observation = format_datetime(
541 observation_datetime, DateFormat.HL7_DATETIME
542 )
543 producer_id = ""
544 observation_method = ""
545 equipment_instance_identifier = ""
546 date_time_of_analysis = ""
548 fields = [
549 segment_id,
550 set_id,
551 value_type,
552 observation_identifier,
553 observation_sub_id,
554 observation_value,
555 units,
556 reference_range,
557 abnormal_flags,
558 probability,
559 nature_of_abnormal_test,
560 observation_result_status,
561 date_of_last_observation_normal_values,
562 user_defined_access_checks,
563 date_and_time_of_observation,
564 producer_id,
565 responsible_observer,
566 observation_method,
567 equipment_instance_identifier,
568 date_time_of_analysis,
569 ]
570 segment = hl7.Segment(FIELD_SEPARATOR, fields)
571 return segment
574def make_dg1_segment(
575 set_id: int,
576 diagnosis_datetime: Pendulum,
577 coding_system: str,
578 diagnosis_identifier: str,
579 diagnosis_text: str,
580 alternate_coding_system: str = "",
581 alternate_diagnosis_identifier: str = "",
582 alternate_diagnosis_text: str = "",
583 diagnosis_type: str = "F",
584 diagnosis_classification: str = "D",
585 confidential_indicator: str = "N",
586 clinician_id_number: Union[str, int] = None,
587 clinician_surname: str = "",
588 clinician_forename: str = "",
589 clinician_middle_name_or_initial: str = "",
590 clinician_suffix: str = "",
591 clinician_prefix: str = "",
592 clinician_degree: str = "",
593 clinician_source_table: str = "",
594 clinician_assigning_authority: str = "",
595 clinician_name_type_code: str = "",
596 clinician_identifier_type_code: str = "",
597 clinician_assigning_facility: str = "",
598 attestation_datetime: Pendulum = None,
599) -> hl7.Segment:
600 # noinspection HttpUrlsUsage
601 """
602 Creates an HL7 diagnosis (DG1) segment.
604 Args:
606 .. code-block:: none
608 set_id: Diagnosis sequence number, starting with 1 (use higher numbers
609 for >1 diagnosis).
610 diagnosis_datetime: Date/time diagnosis was made.
612 coding_system: E.g. "I9C" for ICD9-CM; "I10" for ICD10.
613 diagnosis_identifier: Code.
614 diagnosis_text: Text.
616 alternate_coding_system: Optional alternate coding system.
617 alternate_diagnosis_identifier: Optional alternate code.
618 alternate_diagnosis_text: Optional alternate text.
620 diagnosis_type: A admitting, W working, F final.
621 diagnosis_classification: C consultation, D diagnosis, M medication,
622 O other, R radiological scheduling, S sign and symptom,
623 T tissue diagnosis, I invasive procedure not classified elsewhere.
624 confidential_indicator: Y yes, N no
626 clinician_id_number: } Diagnosing clinician.
627 clinician_surname: }
628 clinician_forename: }
629 clinician_middle_name_or_initial: }
630 clinician_suffix: }
631 clinician_prefix: }
632 clinician_degree: }
633 clinician_source_table: }
634 clinician_assigning_authority: }
635 clinician_name_type_code: }
636 clinician_identifier_type_code: }
637 clinician_assigning_facility: }
639 attestation_datetime: Date/time the diagnosis was attested.
641 - http://www.mexi.be/documents/hl7/ch600012.htm
642 - https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf
643 """
645 segment_id = "DG1"
646 try:
647 int(set_id)
648 set_id = str(set_id) # type: ignore[assignment]
649 except Exception:
650 raise AssertionError("make_dg1_segment: set_id invalid")
651 diagnosis_coding_method = ""
652 diagnosis_code = hl7.Field(
653 COMPONENT_SEPARATOR,
654 [
655 diagnosis_identifier,
656 diagnosis_text,
657 coding_system,
658 alternate_diagnosis_identifier,
659 alternate_diagnosis_text,
660 alternate_coding_system,
661 ],
662 )
663 diagnosis_description = ""
664 diagnosis_datetime = format_datetime(
665 diagnosis_datetime, DateFormat.HL7_DATETIME
666 )
667 if diagnosis_type not in ("A", "W", "F"):
668 raise AssertionError("make_dg1_segment: diagnosis_type invalid")
669 major_diagnostic_category = ""
670 diagnostic_related_group = ""
671 drg_approval_indicator = ""
672 drg_grouper_review_code = ""
673 outlier_type = ""
674 outlier_days = ""
675 outlier_cost = ""
676 grouper_version_and_type = ""
677 diagnosis_priority = ""
679 try:
680 clinician_id_number = (
681 str(int(clinician_id_number))
682 if clinician_id_number is not None
683 else ""
684 )
685 except Exception:
686 raise AssertionError(
687 "make_dg1_segment: diagnosing_clinician_id_number" " invalid"
688 )
689 if clinician_id_number:
690 clinician_id_check_digit = get_mod11_checkdigit(clinician_id_number)
691 clinician_checkdigit_scheme = "M11" # Mod 11 algorithm
692 else:
693 clinician_id_check_digit = ""
694 clinician_checkdigit_scheme = ""
695 diagnosing_clinician = hl7.Field(
696 COMPONENT_SEPARATOR,
697 [
698 clinician_id_number,
699 clinician_surname or "",
700 clinician_forename or "",
701 clinician_middle_name_or_initial or "",
702 clinician_suffix or "",
703 clinician_prefix or "",
704 clinician_degree or "",
705 clinician_source_table or "",
706 clinician_assigning_authority or "",
707 clinician_name_type_code or "",
708 clinician_id_check_digit or "",
709 clinician_checkdigit_scheme or "",
710 clinician_identifier_type_code or "",
711 clinician_assigning_facility or "",
712 ],
713 )
715 if diagnosis_classification not in (
716 "C",
717 "D",
718 "M",
719 "O",
720 "R",
721 "S",
722 "T",
723 "I",
724 ):
725 raise AssertionError(
726 "make_dg1_segment: diagnosis_classification invalid"
727 )
728 if confidential_indicator not in ("Y", "N"):
729 raise AssertionError(
730 "make_dg1_segment: confidential_indicator invalid"
731 )
732 attestation_datetime = (
733 format_datetime(attestation_datetime, DateFormat.HL7_DATETIME)
734 if attestation_datetime
735 else ""
736 )
738 fields = [
739 segment_id,
740 set_id,
741 diagnosis_coding_method,
742 diagnosis_code,
743 diagnosis_description,
744 diagnosis_datetime,
745 diagnosis_type,
746 major_diagnostic_category,
747 diagnostic_related_group,
748 drg_approval_indicator,
749 drg_grouper_review_code,
750 outlier_type,
751 outlier_days,
752 outlier_cost,
753 grouper_version_and_type,
754 diagnosis_priority,
755 diagnosing_clinician,
756 diagnosis_classification,
757 confidential_indicator,
758 attestation_datetime,
759 ]
760 segment = hl7.Segment(FIELD_SEPARATOR, fields)
761 return segment
764def escape_hl7_text(s: str) -> str:
765 # noinspection HttpUrlsUsage
766 """
767 Escapes HL7 special characters.
769 - http://www.mexi.be/documents/hl7/ch200034.htm
770 - http://www.mexi.be/documents/hl7/ch200071.htm
771 """
772 esc_escape = ESCAPE_CHARACTER + ESCAPE_CHARACTER + ESCAPE_CHARACTER
773 esc_fieldsep = ESCAPE_CHARACTER + "F" + ESCAPE_CHARACTER
774 esc_componentsep = ESCAPE_CHARACTER + "S" + ESCAPE_CHARACTER
775 esc_subcomponentsep = ESCAPE_CHARACTER + "T" + ESCAPE_CHARACTER
776 esc_repetitionsep = ESCAPE_CHARACTER + "R" + ESCAPE_CHARACTER
778 # Linebreaks:
779 # http://www.healthintersections.com.au/?p=344
780 # https://groups.google.com/forum/#!topic/ensemble-in-healthcare/wP2DWMeFrPA # noqa
781 # http://www.hermetechnz.com/documentation/sqlschema/index.html?hl7_escape_rules.htm # noqa
782 esc_linebreak = ESCAPE_CHARACTER + ".br" + ESCAPE_CHARACTER
784 s = s.replace(ESCAPE_CHARACTER, esc_escape) # this one first!
785 s = s.replace(FIELD_SEPARATOR, esc_fieldsep)
786 s = s.replace(COMPONENT_SEPARATOR, esc_componentsep)
787 s = s.replace(SUBCOMPONENT_SEPARATOR, esc_subcomponentsep)
788 s = s.replace(REPETITION_SEPARATOR, esc_repetitionsep)
789 s = s.replace("\n", esc_linebreak)
790 return s
793def msg_is_successful_ack(msg: hl7.Message) -> Tuple[bool, Optional[str]]:
794 # noinspection HttpUrlsUsage
795 """
796 Checks whether msg represents a successful acknowledgement message.
798 - http://hl7reference.com/HL7%20Specifications%20ORM-ORU.PDF
799 """
801 if msg is None:
802 return False, "Reply is None"
804 # Get segments (MSH, MSA)
805 if len(msg) != 2:
806 return False, f"Reply doesn't have 2 segments (has {len(msg)})"
807 msh_segment = msg[0]
808 msa_segment = msg[1]
810 # Check MSH segment
811 if len(msh_segment) < 9:
812 return (
813 False,
814 f"First (MSH) segment has <9 fields (has {len(msh_segment)})",
815 )
816 msh_segment_id = msh_segment[0]
817 msh_message_type = msh_segment[8]
818 if msh_segment_id != ["MSH"]:
819 return (
820 False,
821 f"First (MSH) segment ID is not 'MSH' (is {msh_segment_id})",
822 )
823 if msh_message_type != ["ACK"]:
824 return (
825 False,
826 f"MSH message type is not 'ACK' (is {msh_message_type})",
827 )
829 # Check MSA segment
830 if len(msa_segment) < 2:
831 return (
832 False,
833 f"Second (MSA) segment has <2 fields (has {len(msa_segment)})",
834 )
835 msa_segment_id = msa_segment[0]
836 msa_acknowledgment_code = msa_segment[1]
837 if msa_segment_id != ["MSA"]:
838 return (
839 False,
840 f"Second (MSA) segment ID is not 'MSA' (is {msa_segment_id})",
841 )
842 if msa_acknowledgment_code != ["AA"]:
843 # AA for success, AE for error
844 return (
845 False,
846 (
847 f"MSA acknowledgement code is not 'AA' "
848 f"(is {msa_acknowledgment_code})"
849 ),
850 )
852 return True, None
855# =============================================================================
856# MLLPTimeoutClient
857# =============================================================================
858# Modification of MLLPClient from python-hl7, to allow timeouts and failure.
860SB = "\x0b" # <SB>, vertical tab
861EB = "\x1c" # <EB>, file separator
862CR = "\x0d" # <CR>, \r
863FF = "\x0c" # <FF>, new page form feed
865RECV_BUFFER = 4096
868class MLLPTimeoutClient(object):
869 """
870 Class for MLLP TCP/IP transmission that implements timeouts.
871 """
873 def __init__(self, host: str, port: int, timeout_ms: int = None) -> None:
874 """Creates MLLP client and opens socket."""
875 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
876 timeout_s = (
877 float(timeout_ms) / float(1000) if timeout_ms is not None else None
878 )
879 self.socket.settimeout(timeout_s)
880 self.socket.connect((host, port))
881 self.encoding = "utf-8"
883 def __enter__(self) -> "MLLPTimeoutClient":
884 """
885 For use with "with" statement.
886 """
887 return self
889 # noinspection PyUnusedLocal
890 def __exit__(
891 self,
892 exc_type: Optional[Type[BaseException]],
893 exc_val: Optional[BaseException],
894 traceback: Optional[TracebackType],
895 ) -> Optional[bool]:
896 """
897 For use with "with" statement.
898 """
899 self.close()
901 return None
903 def close(self) -> None:
904 """
905 Release the socket connection.
906 """
907 self.socket.close()
909 def send_message(
910 self, message: Union[str, hl7.Message]
911 ) -> Tuple[bool, Optional[str]]:
912 """
913 Wraps a string or :class:`hl7.Message` in a MLLP container
914 and sends the message to the server.
916 Returns ``success, ack_msg``.
917 """
918 if isinstance(message, hl7.Message):
919 message = str(message)
920 # wrap in MLLP message container
921 data = SB + message + CR + EB + CR
922 # ... the CR immediately after the message is my addition, because
923 # HL7 Inspector otherwise says: "Warning: last segment have no segment
924 # termination char 0x0d !" (sic).
925 return self.send(data.encode(self.encoding))
927 def send(self, data: bytes) -> Tuple[bool, Optional[str]]:
928 """
929 Low-level, direct access to the ``socket.send`` function (data must be
930 already wrapped in an MLLP container). Blocks until the server
931 returns.
933 Returns ``success, ack_msg``.
934 """
935 # upload the data
936 self.socket.send(data)
937 # wait for the ACK/NACK
938 try:
939 ack_msg = self.socket.recv(RECV_BUFFER).decode(self.encoding)
940 return True, ack_msg
941 except socket.timeout:
942 return False, None