Coverage for cc_modules/cc_hl7.py: 13%

290 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1# noinspection HttpUrlsUsage 

2""" 

3camcops_server/cc_modules/cc_hl7.py 

4 

5=============================================================================== 

6 

7 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

8 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27**Core HL7 functions, e.g. to build HL7 v2 messages.** 

28 

29General HL7 sources: 

30 

31- https://python-hl7.readthedocs.org/en/latest/ 

32- http://www.interfaceware.com/manual/v3gen_python_library_details.html 

33- http://www.interfaceware.com/hl7_video_vault.html#how 

34- http://www.interfaceware.com/hl7-standard/hl7-segments.html 

35- https://www.hl7.org/special/committees/vocab/v26_appendix_a.pdf 

36- https://www.ncbi.nlm.nih.gov/pmc/articles/PMC130066/ 

37 

38To consider 

39 

40- batched messages (HL7 batching protocol); 

41 https://docs.oracle.com/cd/E23943_01/user.1111/e23486/app_hl7batching.htm 

42- note: DG1 segment = diagnosis 

43 

44Basic HL7 message structure: 

45 

46- can package into HL7 2.X message as encapsulated PDF; 

47 https://www.hl7standards.com/blog/2007/11/27/pdf-attachment-in-hl7-message/ 

48- message ORU^R01 

49 https://www.corepointhealth.com/resource-center/hl7-resources/hl7-messages 

50- MESSAGES: http://www.interfaceware.com/hl7-standard/hl7-messages.html 

51- OBX segment = observation/result segment; 

52 https://www.corepointhealth.com/resource-center/hl7-resources/hl7-obx-segment; 

53 http://www.interfaceware.com/hl7-standard/hl7-segment-OBX.html 

54- SEGMENTS: 

55 https://www.corepointhealth.com/resource-center/hl7-resources/hl7-segments 

56- ED field (= encapsulated data); 

57 http://www.interfaceware.com/hl7-standard/hl7-fields.html 

58- base-64 encoding 

59 

60We can then add an option for structure (XML), HTML, PDF export. 

61 

62""" 

63 

64import base64 

65import logging 

66import socket 

67from types import TracebackType 

68from typing import List, Optional, Tuple, Type, TYPE_CHECKING, Union 

69 

70from cardinal_pythonlib.datetimefunc import format_datetime 

71from cardinal_pythonlib.logs import BraceStyleAdapter 

72import hl7 

73from pendulum import Date, DateTime as Pendulum 

74 

75from camcops_server.cc_modules.cc_constants import DateFormat, FileType 

76from camcops_server.cc_modules.cc_simpleobjects import HL7PatientIdentifier 

77 

78if TYPE_CHECKING: 

79 from camcops_server.cc_modules.cc_request import CamcopsRequest 

80 from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions 

81 from camcops_server.cc_modules.cc_task import Task 

82 

83log = BraceStyleAdapter(logging.getLogger(__name__)) 

84 

85 

86# ============================================================================= 

87# Constants 

88# ============================================================================= 

89 

90# STRUCTURE OF HL7 MESSAGES 

91# MESSAGE = list of segments, separated by carriage returns 

92SEGMENT_SEPARATOR = "\r" 

93# SEGMENT = list of fields (= composites), separated by pipes 

94FIELD_SEPARATOR = "|" 

95# FIELD (= COMPOSITE) = string, or list of components separated by carets 

96COMPONENT_SEPARATOR = "^" 

97# Component = string, or lists of subcomponents separated by ampersands 

98SUBCOMPONENT_SEPARATOR = "&" 

99# Subcomponents must be primitive data types (i.e. strings). 

100# ... http://www.interfaceware.com/blog/hl7-composites/ 

101 

102REPETITION_SEPARATOR = "~" 

103ESCAPE_CHARACTER = "\\" 

104 

105# Fields are specified in terms of DATA TYPES: 

106# http://www.corepointhealth.com/resource-center/hl7-resources/hl7-data-types 

107 

108# Some of those are COMPOSITE TYPES: 

109# http://amisha.pragmaticdata.com/~gunther/oldhtml/composites.html#COMPOSITES 

110 

111 

112# ============================================================================= 

113# HL7 helper functions 

114# ============================================================================= 

115 

116 

117def get_mod11_checkdigit(strnum: str) -> str: 

118 # noinspection HttpUrlsUsage 

119 """ 

120 Input: string containing integer. Output: MOD11 check digit (string). 

121 

122 See: 

123 

124 - http://www.mexi.be/documents/hl7/ch200025.htm 

125 - https://stackoverflow.com/questions/7006109 

126 - http://www.pgrocer.net/Cis51/mod11.html 

127 """ 

128 total = 0 

129 multiplier = 2 # 2 for units digit, increases to 7, then resets to 2 

130 try: 

131 for i in reversed(range(len(strnum))): 

132 total += int(strnum[i]) * multiplier 

133 multiplier += 1 

134 if multiplier == 8: 

135 multiplier = 2 

136 c = str(11 - (total % 11)) 

137 if c == "11": 

138 c = "0" 

139 elif c == "10": 

140 c = "X" 

141 return c 

142 except (TypeError, ValueError): 

143 # garbage in... 

144 return "" 

145 

146 

147def make_msh_segment( 

148 message_datetime: Pendulum, message_control_id: str 

149) -> hl7.Segment: 

150 """ 

151 Creates an HL7 message header (MSH) segment. 

152 

153 - MSH: https://www.hl7.org/documentcenter/public/wg/conf/HL7MSH.htm 

154 

155 - We're making an ORU^R01 message = unsolicited result. 

156 

157 - ORU = Observational Report - Unsolicited 

158 - ORU^R01 = Unsolicited transmission of an observation message 

159 - https://www.corepointhealth.com/resource-center/hl7-resources/hl7-oru-message 

160 - https://www.hl7kit.com/joomla/index.php/hl7resources/examples/107-orur01 

161 """ # noqa 

162 

163 segment_id = "MSH" 

164 encoding_characters = ( 

165 COMPONENT_SEPARATOR 

166 + REPETITION_SEPARATOR 

167 + ESCAPE_CHARACTER 

168 + SUBCOMPONENT_SEPARATOR 

169 ) 

170 sending_application = "CamCOPS" 

171 sending_facility = "" 

172 receiving_application = "" 

173 receiving_facility = "" 

174 date_time_of_message = format_datetime( 

175 message_datetime, DateFormat.HL7_DATETIME 

176 ) 

177 security = "" 

178 message_type = hl7.Field( 

179 COMPONENT_SEPARATOR, 

180 [ 

181 "ORU", # message type ID = Observ result/unsolicited 

182 "R01", # trigger event ID = ORU/ACK - Unsolicited transmission 

183 # of an observation message 

184 ], 

185 ) 

186 processing_id = "P" # production (processing mode: current) 

187 version_id = "2.3" # HL7 version 

188 sequence_number = "" 

189 continuation_pointer = "" 

190 accept_acknowledgement_type = "" 

191 application_acknowledgement_type = "AL" # always 

192 country_code = "" 

193 character_set = "UNICODE UTF-8" 

194 # http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages 

195 principal_language_of_message = "" 

196 

197 fields = [ 

198 segment_id, 

199 # field separator inserted automatically; HL7 standard considers it a 

200 # field but the python-hl7 processor doesn't when it parses 

201 encoding_characters, 

202 sending_application, 

203 sending_facility, 

204 receiving_application, 

205 receiving_facility, 

206 date_time_of_message, 

207 security, 

208 message_type, 

209 message_control_id, 

210 processing_id, 

211 version_id, 

212 sequence_number, 

213 continuation_pointer, 

214 accept_acknowledgement_type, 

215 application_acknowledgement_type, 

216 country_code, 

217 character_set, 

218 principal_language_of_message, 

219 ] 

220 segment = hl7.Segment(FIELD_SEPARATOR, fields) 

221 return segment 

222 

223 

224def make_pid_segment( 

225 forename: str, 

226 surname: str, 

227 dob: Date, 

228 sex: str, 

229 address: str, 

230 patient_id_list: List[HL7PatientIdentifier] = None, 

231) -> hl7.Segment: 

232 """ 

233 Creates an HL7 patient identification (PID) segment. 

234 

235 - https://www.corepointhealth.com/resource-center/hl7-resources/hl7-pid-segment 

236 - https://www.hl7.org/documentcenter/public/wg/conf/Msgadt.pdf (s5.4.8) 

237 

238 - ID numbers... 

239 https://www.cdc.gov/vaccines/programs/iis/technical-guidance/downloads/hl7guide-1-4-2012-08.pdf 

240 """ # noqa 

241 

242 patient_id_list = patient_id_list or [] # type: List[HL7PatientIdentifier] 

243 

244 segment_id = "PID" 

245 set_id = "" 

246 

247 # External ID 

248 patient_external_id = "" 

249 # ... this one is deprecated 

250 # http://www.j4jayant.com/articles/hl7/16-patient-id 

251 

252 # Internal ID 

253 internal_id_element_list = [] 

254 for i in range(len(patient_id_list)): 

255 if not patient_id_list[i].pid: 

256 continue 

257 ptidentifier = patient_id_list[i] 

258 pid = ptidentifier.pid 

259 check_digit = get_mod11_checkdigit(pid) 

260 check_digit_scheme = "M11" # Mod 11 algorithm 

261 type_id = patient_id_list[i].id_type 

262 assigning_authority = patient_id_list[i].assigning_authority 

263 # Now, as per Table 4.6 "Extended composite ID" of 

264 # hl7guide-1-4-2012-08.pdf: 

265 internal_id_element = hl7.Field( 

266 COMPONENT_SEPARATOR, 

267 [ 

268 pid, 

269 check_digit, 

270 check_digit_scheme, 

271 assigning_authority, 

272 type_id, # length "2..5" meaning 2-5 

273 ], 

274 ) 

275 internal_id_element_list.append(internal_id_element) 

276 patient_internal_id = hl7.Field( 

277 REPETITION_SEPARATOR, internal_id_element_list 

278 ) 

279 

280 # Alternate ID 

281 alternate_patient_id = "" 

282 # ... this one is deprecated 

283 # http://www.j4jayant.com/articles/hl7/16-patient-id 

284 

285 patient_name = hl7.Field( 

286 COMPONENT_SEPARATOR, 

287 [ 

288 forename, # surname 

289 surname, # forename 

290 "", # middle initial/name 

291 "", # suffix (e.g. Jr, III) 

292 "", # prefix (e.g. Dr) 

293 "", # degree (e.g. MD) 

294 ], 

295 ) 

296 mothers_maiden_name = "" 

297 date_of_birth = format_datetime(dob, DateFormat.HL7_DATE) 

298 alias = "" 

299 race = "" 

300 country_code = "" 

301 home_phone_number = "" 

302 business_phone_number = "" 

303 language = "" 

304 marital_status = "" 

305 religion = "" 

306 account_number = "" 

307 social_security_number = "" 

308 drivers_license_number = "" 

309 mother_identifier = "" 

310 ethnic_group = "" 

311 birthplace = "" 

312 birth_order = "" 

313 citizenship = "" 

314 veterans_military_status = "" 

315 

316 fields = [ 

317 segment_id, 

318 set_id, # PID.1 

319 patient_external_id, # PID.2 

320 patient_internal_id, # known as "PID-3" or "PID.3" 

321 alternate_patient_id, # PID.4 

322 patient_name, 

323 mothers_maiden_name, 

324 date_of_birth, 

325 sex, 

326 alias, 

327 race, 

328 address, 

329 country_code, 

330 home_phone_number, 

331 business_phone_number, 

332 language, 

333 marital_status, 

334 religion, 

335 account_number, 

336 social_security_number, 

337 drivers_license_number, 

338 mother_identifier, 

339 ethnic_group, 

340 birthplace, 

341 birth_order, 

342 citizenship, 

343 veterans_military_status, 

344 ] 

345 segment = hl7.Segment(FIELD_SEPARATOR, fields) 

346 return segment 

347 

348 

349# noinspection PyUnusedLocal 

350def make_obr_segment(task: "Task") -> hl7.Segment: 

351 # noinspection HttpUrlsUsage 

352 """ 

353 Creates an HL7 observation request (OBR) segment. 

354 

355 - http://hl7reference.com/HL7%20Specifications%20ORM-ORU.PDF 

356 - Required in ORU^R01 message: 

357 

358 - https://www.corepointhealth.com/resource-center/hl7-resources/hl7-oru-message 

359 - https://www.corepointhealth.com/resource-center/hl7-resources/hl7-obr-segment 

360 """ # noqa 

361 

362 segment_id = "OBR" 

363 set_id = "1" 

364 placer_order_number = "CamCOPS" 

365 filler_order_number = "CamCOPS" 

366 universal_service_id = hl7.Field( 

367 COMPONENT_SEPARATOR, 

368 ["CamCOPS", "CamCOPS psychiatric/cognitive assessment"], 

369 ) 

370 # unused below here, apparently 

371 priority = "" 

372 requested_date_time = "" 

373 observation_date_time = "" 

374 observation_end_date_time = "" 

375 collection_volume = "" 

376 collector_identifier = "" 

377 specimen_action_code = "" 

378 danger_code = "" 

379 relevant_clinical_information = "" 

380 specimen_received_date_time = "" 

381 ordering_provider = "" 

382 order_callback_phone_number = "" 

383 placer_field_1 = "" 

384 placer_field_2 = "" 

385 filler_field_1 = "" 

386 filler_field_2 = "" 

387 results_report_status_change_date_time = "" 

388 charge_to_practice = "" 

389 diagnostic_service_section_id = "" 

390 result_status = "" 

391 parent_result = "" 

392 quantity_timing = "" 

393 result_copies_to = "" 

394 parent = "" 

395 transportation_mode = "" 

396 reason_for_study = "" 

397 principal_result_interpreter = "" 

398 assistant_result_interpreter = "" 

399 technician = "" 

400 transcriptionist = "" 

401 scheduled_date_time = "" 

402 number_of_sample_containers = "" 

403 transport_logistics_of_collected_samples = "" 

404 collectors_comment = "" 

405 transport_arrangement_responsibility = "" 

406 transport_arranged = "" 

407 escort_required = "" 

408 planned_patient_transport_comment = "" 

409 

410 fields = [ 

411 segment_id, 

412 set_id, 

413 placer_order_number, 

414 filler_order_number, 

415 universal_service_id, 

416 priority, 

417 requested_date_time, 

418 observation_date_time, 

419 observation_end_date_time, 

420 collection_volume, 

421 collector_identifier, 

422 specimen_action_code, 

423 danger_code, 

424 relevant_clinical_information, 

425 specimen_received_date_time, 

426 ordering_provider, 

427 order_callback_phone_number, 

428 placer_field_1, 

429 placer_field_2, 

430 filler_field_1, 

431 filler_field_2, 

432 results_report_status_change_date_time, 

433 charge_to_practice, 

434 diagnostic_service_section_id, 

435 result_status, 

436 parent_result, 

437 quantity_timing, 

438 result_copies_to, 

439 parent, 

440 transportation_mode, 

441 reason_for_study, 

442 principal_result_interpreter, 

443 assistant_result_interpreter, 

444 technician, 

445 transcriptionist, 

446 scheduled_date_time, 

447 number_of_sample_containers, 

448 transport_logistics_of_collected_samples, 

449 collectors_comment, 

450 transport_arrangement_responsibility, 

451 transport_arranged, 

452 escort_required, 

453 planned_patient_transport_comment, 

454 ] 

455 segment = hl7.Segment(FIELD_SEPARATOR, fields) 

456 return segment 

457 

458 

459def make_obx_segment( 

460 req: "CamcopsRequest", 

461 task: "Task", 

462 task_format: str, 

463 observation_identifier: str, 

464 observation_datetime: Pendulum, 

465 responsible_observer: str, 

466 export_options: "TaskExportOptions", 

467) -> hl7.Segment: 

468 # noinspection HttpUrlsUsage 

469 """ 

470 Creates an HL7 observation result (OBX) segment. 

471 

472 - http://www.hl7standards.com/blog/2006/10/18/how-do-i-send-a-binary-file-inside-of-an-hl7-message 

473 - http://www.hl7standards.com/blog/2007/11/27/pdf-attachment-in-hl7-message/ 

474 - http://www.hl7standards.com/blog/2006/12/01/sending-images-or-formatted-documents-via-hl7-messaging/ 

475 - https://www.hl7.org/documentcenter/public/wg/ca/HL7ClmAttIG.PDF 

476 - type of data: 

477 https://www.hl7.org/implement/standards/fhir/v2/0191/index.html 

478 - subtype of data: 

479 https://www.hl7.org/implement/standards/fhir/v2/0291/index.html 

480 """ # noqa 

481 

482 segment_id = "OBX" 

483 set_id = str(1) 

484 

485 source_application = "CamCOPS" 

486 if task_format == FileType.PDF: 

487 value_type = "ED" # Encapsulated data (ED) field 

488 observation_value = hl7.Field( 

489 COMPONENT_SEPARATOR, 

490 [ 

491 source_application, 

492 "Application", # type of data 

493 "PDF", # data subtype 

494 "Base64", # base 64 encoding 

495 base64.standard_b64encode(task.get_pdf(req)), # data 

496 ], 

497 ) 

498 elif task_format == FileType.HTML: 

499 value_type = "ED" # Encapsulated data (ED) field 

500 observation_value = hl7.Field( 

501 COMPONENT_SEPARATOR, 

502 [ 

503 source_application, 

504 "TEXT", # type of data 

505 "HTML", # data subtype 

506 "A", # no encoding (see table 0299), but need to escape 

507 escape_hl7_text(task.get_html(req)), # data 

508 ], 

509 ) 

510 elif task_format == FileType.XML: 

511 value_type = "ED" # Encapsulated data (ED) field 

512 observation_value = hl7.Field( 

513 COMPONENT_SEPARATOR, 

514 [ 

515 source_application, 

516 "TEXT", # type of data 

517 "XML", # data subtype 

518 "A", # no encoding (see table 0299), but need to escape 

519 escape_hl7_text( 

520 task.get_xml( 

521 req, indent_spaces=0, eol="", options=export_options 

522 ) 

523 ), # data 

524 ], 

525 ) 

526 else: 

527 raise AssertionError( 

528 f"make_obx_segment: invalid task_format: {task_format}" 

529 ) 

530 

531 observation_sub_id = "" 

532 units = "" 

533 reference_range = "" 

534 abnormal_flags = "" 

535 probability = "" 

536 nature_of_abnormal_test = "" 

537 observation_result_status = "" 

538 date_of_last_observation_normal_values = "" 

539 user_defined_access_checks = "" 

540 date_and_time_of_observation = format_datetime( 

541 observation_datetime, DateFormat.HL7_DATETIME 

542 ) 

543 producer_id = "" 

544 observation_method = "" 

545 equipment_instance_identifier = "" 

546 date_time_of_analysis = "" 

547 

548 fields = [ 

549 segment_id, 

550 set_id, 

551 value_type, 

552 observation_identifier, 

553 observation_sub_id, 

554 observation_value, 

555 units, 

556 reference_range, 

557 abnormal_flags, 

558 probability, 

559 nature_of_abnormal_test, 

560 observation_result_status, 

561 date_of_last_observation_normal_values, 

562 user_defined_access_checks, 

563 date_and_time_of_observation, 

564 producer_id, 

565 responsible_observer, 

566 observation_method, 

567 equipment_instance_identifier, 

568 date_time_of_analysis, 

569 ] 

570 segment = hl7.Segment(FIELD_SEPARATOR, fields) 

571 return segment 

572 

573 

574def make_dg1_segment( 

575 set_id: int, 

576 diagnosis_datetime: Pendulum, 

577 coding_system: str, 

578 diagnosis_identifier: str, 

579 diagnosis_text: str, 

580 alternate_coding_system: str = "", 

581 alternate_diagnosis_identifier: str = "", 

582 alternate_diagnosis_text: str = "", 

583 diagnosis_type: str = "F", 

584 diagnosis_classification: str = "D", 

585 confidential_indicator: str = "N", 

586 clinician_id_number: Union[str, int] = None, 

587 clinician_surname: str = "", 

588 clinician_forename: str = "", 

589 clinician_middle_name_or_initial: str = "", 

590 clinician_suffix: str = "", 

591 clinician_prefix: str = "", 

592 clinician_degree: str = "", 

593 clinician_source_table: str = "", 

594 clinician_assigning_authority: str = "", 

595 clinician_name_type_code: str = "", 

596 clinician_identifier_type_code: str = "", 

597 clinician_assigning_facility: str = "", 

598 attestation_datetime: Pendulum = None, 

599) -> hl7.Segment: 

600 # noinspection HttpUrlsUsage 

601 """ 

602 Creates an HL7 diagnosis (DG1) segment. 

603 

604 Args: 

605 

606 .. code-block:: none 

607 

608 set_id: Diagnosis sequence number, starting with 1 (use higher numbers 

609 for >1 diagnosis). 

610 diagnosis_datetime: Date/time diagnosis was made. 

611 

612 coding_system: E.g. "I9C" for ICD9-CM; "I10" for ICD10. 

613 diagnosis_identifier: Code. 

614 diagnosis_text: Text. 

615 

616 alternate_coding_system: Optional alternate coding system. 

617 alternate_diagnosis_identifier: Optional alternate code. 

618 alternate_diagnosis_text: Optional alternate text. 

619 

620 diagnosis_type: A admitting, W working, F final. 

621 diagnosis_classification: C consultation, D diagnosis, M medication, 

622 O other, R radiological scheduling, S sign and symptom, 

623 T tissue diagnosis, I invasive procedure not classified elsewhere. 

624 confidential_indicator: Y yes, N no 

625 

626 clinician_id_number: } Diagnosing clinician. 

627 clinician_surname: } 

628 clinician_forename: } 

629 clinician_middle_name_or_initial: } 

630 clinician_suffix: } 

631 clinician_prefix: } 

632 clinician_degree: } 

633 clinician_source_table: } 

634 clinician_assigning_authority: } 

635 clinician_name_type_code: } 

636 clinician_identifier_type_code: } 

637 clinician_assigning_facility: } 

638 

639 attestation_datetime: Date/time the diagnosis was attested. 

640 

641 - http://www.mexi.be/documents/hl7/ch600012.htm 

642 - https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf 

643 """ 

644 

645 segment_id = "DG1" 

646 try: 

647 int(set_id) 

648 set_id = str(set_id) # type: ignore[assignment] 

649 except Exception: 

650 raise AssertionError("make_dg1_segment: set_id invalid") 

651 diagnosis_coding_method = "" 

652 diagnosis_code = hl7.Field( 

653 COMPONENT_SEPARATOR, 

654 [ 

655 diagnosis_identifier, 

656 diagnosis_text, 

657 coding_system, 

658 alternate_diagnosis_identifier, 

659 alternate_diagnosis_text, 

660 alternate_coding_system, 

661 ], 

662 ) 

663 diagnosis_description = "" 

664 diagnosis_datetime = format_datetime( 

665 diagnosis_datetime, DateFormat.HL7_DATETIME 

666 ) 

667 if diagnosis_type not in ("A", "W", "F"): 

668 raise AssertionError("make_dg1_segment: diagnosis_type invalid") 

669 major_diagnostic_category = "" 

670 diagnostic_related_group = "" 

671 drg_approval_indicator = "" 

672 drg_grouper_review_code = "" 

673 outlier_type = "" 

674 outlier_days = "" 

675 outlier_cost = "" 

676 grouper_version_and_type = "" 

677 diagnosis_priority = "" 

678 

679 try: 

680 clinician_id_number = ( 

681 str(int(clinician_id_number)) 

682 if clinician_id_number is not None 

683 else "" 

684 ) 

685 except Exception: 

686 raise AssertionError( 

687 "make_dg1_segment: diagnosing_clinician_id_number" " invalid" 

688 ) 

689 if clinician_id_number: 

690 clinician_id_check_digit = get_mod11_checkdigit(clinician_id_number) 

691 clinician_checkdigit_scheme = "M11" # Mod 11 algorithm 

692 else: 

693 clinician_id_check_digit = "" 

694 clinician_checkdigit_scheme = "" 

695 diagnosing_clinician = hl7.Field( 

696 COMPONENT_SEPARATOR, 

697 [ 

698 clinician_id_number, 

699 clinician_surname or "", 

700 clinician_forename or "", 

701 clinician_middle_name_or_initial or "", 

702 clinician_suffix or "", 

703 clinician_prefix or "", 

704 clinician_degree or "", 

705 clinician_source_table or "", 

706 clinician_assigning_authority or "", 

707 clinician_name_type_code or "", 

708 clinician_id_check_digit or "", 

709 clinician_checkdigit_scheme or "", 

710 clinician_identifier_type_code or "", 

711 clinician_assigning_facility or "", 

712 ], 

713 ) 

714 

715 if diagnosis_classification not in ( 

716 "C", 

717 "D", 

718 "M", 

719 "O", 

720 "R", 

721 "S", 

722 "T", 

723 "I", 

724 ): 

725 raise AssertionError( 

726 "make_dg1_segment: diagnosis_classification invalid" 

727 ) 

728 if confidential_indicator not in ("Y", "N"): 

729 raise AssertionError( 

730 "make_dg1_segment: confidential_indicator invalid" 

731 ) 

732 attestation_datetime = ( 

733 format_datetime(attestation_datetime, DateFormat.HL7_DATETIME) 

734 if attestation_datetime 

735 else "" 

736 ) 

737 

738 fields = [ 

739 segment_id, 

740 set_id, 

741 diagnosis_coding_method, 

742 diagnosis_code, 

743 diagnosis_description, 

744 diagnosis_datetime, 

745 diagnosis_type, 

746 major_diagnostic_category, 

747 diagnostic_related_group, 

748 drg_approval_indicator, 

749 drg_grouper_review_code, 

750 outlier_type, 

751 outlier_days, 

752 outlier_cost, 

753 grouper_version_and_type, 

754 diagnosis_priority, 

755 diagnosing_clinician, 

756 diagnosis_classification, 

757 confidential_indicator, 

758 attestation_datetime, 

759 ] 

760 segment = hl7.Segment(FIELD_SEPARATOR, fields) 

761 return segment 

762 

763 

764def escape_hl7_text(s: str) -> str: 

765 # noinspection HttpUrlsUsage 

766 """ 

767 Escapes HL7 special characters. 

768 

769 - http://www.mexi.be/documents/hl7/ch200034.htm 

770 - http://www.mexi.be/documents/hl7/ch200071.htm 

771 """ 

772 esc_escape = ESCAPE_CHARACTER + ESCAPE_CHARACTER + ESCAPE_CHARACTER 

773 esc_fieldsep = ESCAPE_CHARACTER + "F" + ESCAPE_CHARACTER 

774 esc_componentsep = ESCAPE_CHARACTER + "S" + ESCAPE_CHARACTER 

775 esc_subcomponentsep = ESCAPE_CHARACTER + "T" + ESCAPE_CHARACTER 

776 esc_repetitionsep = ESCAPE_CHARACTER + "R" + ESCAPE_CHARACTER 

777 

778 # Linebreaks: 

779 # http://www.healthintersections.com.au/?p=344 

780 # https://groups.google.com/forum/#!topic/ensemble-in-healthcare/wP2DWMeFrPA # noqa 

781 # http://www.hermetechnz.com/documentation/sqlschema/index.html?hl7_escape_rules.htm # noqa 

782 esc_linebreak = ESCAPE_CHARACTER + ".br" + ESCAPE_CHARACTER 

783 

784 s = s.replace(ESCAPE_CHARACTER, esc_escape) # this one first! 

785 s = s.replace(FIELD_SEPARATOR, esc_fieldsep) 

786 s = s.replace(COMPONENT_SEPARATOR, esc_componentsep) 

787 s = s.replace(SUBCOMPONENT_SEPARATOR, esc_subcomponentsep) 

788 s = s.replace(REPETITION_SEPARATOR, esc_repetitionsep) 

789 s = s.replace("\n", esc_linebreak) 

790 return s 

791 

792 

793def msg_is_successful_ack(msg: hl7.Message) -> Tuple[bool, Optional[str]]: 

794 # noinspection HttpUrlsUsage 

795 """ 

796 Checks whether msg represents a successful acknowledgement message. 

797 

798 - http://hl7reference.com/HL7%20Specifications%20ORM-ORU.PDF 

799 """ 

800 

801 if msg is None: 

802 return False, "Reply is None" 

803 

804 # Get segments (MSH, MSA) 

805 if len(msg) != 2: 

806 return False, f"Reply doesn't have 2 segments (has {len(msg)})" 

807 msh_segment = msg[0] 

808 msa_segment = msg[1] 

809 

810 # Check MSH segment 

811 if len(msh_segment) < 9: 

812 return ( 

813 False, 

814 f"First (MSH) segment has <9 fields (has {len(msh_segment)})", 

815 ) 

816 msh_segment_id = msh_segment[0] 

817 msh_message_type = msh_segment[8] 

818 if msh_segment_id != ["MSH"]: 

819 return ( 

820 False, 

821 f"First (MSH) segment ID is not 'MSH' (is {msh_segment_id})", 

822 ) 

823 if msh_message_type != ["ACK"]: 

824 return ( 

825 False, 

826 f"MSH message type is not 'ACK' (is {msh_message_type})", 

827 ) 

828 

829 # Check MSA segment 

830 if len(msa_segment) < 2: 

831 return ( 

832 False, 

833 f"Second (MSA) segment has <2 fields (has {len(msa_segment)})", 

834 ) 

835 msa_segment_id = msa_segment[0] 

836 msa_acknowledgment_code = msa_segment[1] 

837 if msa_segment_id != ["MSA"]: 

838 return ( 

839 False, 

840 f"Second (MSA) segment ID is not 'MSA' (is {msa_segment_id})", 

841 ) 

842 if msa_acknowledgment_code != ["AA"]: 

843 # AA for success, AE for error 

844 return ( 

845 False, 

846 ( 

847 f"MSA acknowledgement code is not 'AA' " 

848 f"(is {msa_acknowledgment_code})" 

849 ), 

850 ) 

851 

852 return True, None 

853 

854 

855# ============================================================================= 

856# MLLPTimeoutClient 

857# ============================================================================= 

858# Modification of MLLPClient from python-hl7, to allow timeouts and failure. 

859 

860SB = "\x0b" # <SB>, vertical tab 

861EB = "\x1c" # <EB>, file separator 

862CR = "\x0d" # <CR>, \r 

863FF = "\x0c" # <FF>, new page form feed 

864 

865RECV_BUFFER = 4096 

866 

867 

868class MLLPTimeoutClient(object): 

869 """ 

870 Class for MLLP TCP/IP transmission that implements timeouts. 

871 """ 

872 

873 def __init__(self, host: str, port: int, timeout_ms: int = None) -> None: 

874 """Creates MLLP client and opens socket.""" 

875 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

876 timeout_s = ( 

877 float(timeout_ms) / float(1000) if timeout_ms is not None else None 

878 ) 

879 self.socket.settimeout(timeout_s) 

880 self.socket.connect((host, port)) 

881 self.encoding = "utf-8" 

882 

883 def __enter__(self) -> "MLLPTimeoutClient": 

884 """ 

885 For use with "with" statement. 

886 """ 

887 return self 

888 

889 # noinspection PyUnusedLocal 

890 def __exit__( 

891 self, 

892 exc_type: Optional[Type[BaseException]], 

893 exc_val: Optional[BaseException], 

894 traceback: Optional[TracebackType], 

895 ) -> Optional[bool]: 

896 """ 

897 For use with "with" statement. 

898 """ 

899 self.close() 

900 

901 return None 

902 

903 def close(self) -> None: 

904 """ 

905 Release the socket connection. 

906 """ 

907 self.socket.close() 

908 

909 def send_message( 

910 self, message: Union[str, hl7.Message] 

911 ) -> Tuple[bool, Optional[str]]: 

912 """ 

913 Wraps a string or :class:`hl7.Message` in a MLLP container 

914 and sends the message to the server. 

915 

916 Returns ``success, ack_msg``. 

917 """ 

918 if isinstance(message, hl7.Message): 

919 message = str(message) 

920 # wrap in MLLP message container 

921 data = SB + message + CR + EB + CR 

922 # ... the CR immediately after the message is my addition, because 

923 # HL7 Inspector otherwise says: "Warning: last segment have no segment 

924 # termination char 0x0d !" (sic). 

925 return self.send(data.encode(self.encoding)) 

926 

927 def send(self, data: bytes) -> Tuple[bool, Optional[str]]: 

928 """ 

929 Low-level, direct access to the ``socket.send`` function (data must be 

930 already wrapped in an MLLP container). Blocks until the server 

931 returns. 

932 

933 Returns ``success, ack_msg``. 

934 """ 

935 # upload the data 

936 self.socket.send(data) 

937 # wait for the ACK/NACK 

938 try: 

939 ack_msg = self.socket.recv(RECV_BUFFER).decode(self.encoding) 

940 return True, ack_msg 

941 except socket.timeout: 

942 return False, None