Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/hl7/containers.py : 21%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2from __future__ import unicode_literals
3import datetime
4import logging
5import six
6from .compat import python_2_unicode_compatible
7from .accessor import Accessor
8from .util import generate_message_control_id
10logger = logging.getLogger(__file__)
12_SENTINEL = object()
15class Sequence(list):
16 """Base class for sequences that can be indexed using 1-based index"""
17 def __call__(self, index, value=_SENTINEL):
18 """Support list access using HL7 compatible 1-based indices.
19 Can be used to get and set values.
21 >>> s = hl7.Sequence([1, 2, 3, 4])
22 >>> s(1) == s[0]
23 True
24 >>> s(2, "new")
25 >>> s
26 [1, 'new', 3, 4]
27 """
28 index = self._adjust_index(int(index))
29 if value is _SENTINEL:
30 return self[index]
31 else:
32 self[index] = value
34 def _adjust_index(self, index):
35 """Subclasses can override if they do not want HL7 1-based indexing when used as callable"""
36 if index >= 1:
37 return index - 1
38 else:
39 return index
42@python_2_unicode_compatible
43class Container(Sequence):
44 """Abstract root class for the parts of the HL7 message."""
45 def __init__(self, separator, sequence=[], esc='\\', separators='\r|~^&', factory=None):
46 # Initialize the list object, optionally passing in the
47 # sequence. Since list([]) == [], using the default
48 # parameter will not cause any issues.
49 super(Container, self).__init__(sequence)
50 self.separator = separator
51 self.esc = esc
52 self.separators = separators
53 self.factory = factory if factory is not None else Factory
55 def __getitem__(self, item):
56 # Python slice operator was returning a regular list, not a
57 # Container subclass
58 sequence = super(Container, self).__getitem__(item)
59 if isinstance(item, slice):
60 return self.__class__(
61 self.separator, sequence, self.esc, self.separators, factory=self.factory
62 )
63 return sequence
65 def __getslice__(self, i, j):
66 # Python 2.x compatibility. __getslice__ is deprecated, and
67 # we want to wrap the logic from __getitem__ when handling slices
68 return self.__getitem__(slice(i, j))
70 def __str__(self):
71 """Join a the child containers into a single string, separated
72 by the self.separator. This method acts recursively, calling
73 the children's __unicode__ method. Thus ``unicode()`` is the
74 approriate method for turning the python-hl7 representation of
75 HL7 into a standard string.
77 >>> unicode(h) == message
78 True
80 .. note::
81 For Python 2.x use ``unicode()``, but for Python 3.x, use
82 ``str()``
84 """
85 return self.separator.join((six.text_type(x) for x in self))
88class Message(Container):
89 """Representation of an HL7 message. It contains a list
90 of :py:class:`hl7.Segment` instances.
91 """
92 def __getitem__(self, key):
93 """Index, segment-based or accessor lookup.
95 If key is an integer, ``__getitem__`` acts list a list, returning
96 the :py:class:`hl7.Segment` held at that index:
98 >>> h[1] # doctest: +ELLIPSIS
99 [[u'PID'], ...]
101 If the key is a string of length 3, ``__getitem__`` acts like a dictionary,
102 returning all segments whose *segment_id* is *key*
103 (alias of :py:meth:`hl7.Message.segments`).
105 >>> h['OBX'] # doctest: +ELLIPSIS
106 [[[u'OBX'], [u'1'], ...]]
108 If the key is a string of length greater than 3,
109 the key is parsed into an :py:class:`hl7.Accessor` and passed
110 to :py:meth:`hl7.Message.extract_field`.
112 If the key is an :py:class:`hl7.Accessor`, it is passed to
113 :py:meth:`hl7.Message.extract_field`.
114 """
115 if isinstance(key, six.string_types):
116 if len(key) == 3:
117 return self.segments(key)
118 return self.extract_field(*Accessor.parse_key(key))
119 elif isinstance(key, Accessor):
120 return self.extract_field(*key)
121 return super(Message, self).__getitem__(key)
123 def __setitem__(self, key, value):
124 """Index or accessor assignment.
126 If key is an integer, ``__setitem__`` acts list a list, setting
127 the :py:class:`hl7.Segment` held at that index:
129 >>> h[1] = hl7.Segment("|", [hl7.Field("^", [u'PID'], [u''])])
131 If the key is a string of length greater than 3,
132 the key is parsed into an :py:class:`hl7.Accessor` and passed
133 to :py:meth:`hl7.Message.assign_field`.
135 >>> h["PID.2"] = "NEW"
137 If the key is an :py:class:`hl7.Accessor`, it is passed to
138 :py:meth:`hl7.Message.assign_field`.
139 """
140 if isinstance(key, six.string_types) and len(key) > 3 and isinstance(value, six.string_types):
141 return self.assign_field(value, *Accessor.parse_key(key))
142 elif isinstance(key, Accessor):
143 return self.assign_field(value, *key)
144 return super(Message, self).__setitem__(key, value)
146 def segment(self, segment_id):
147 """Gets the first segment with the *segment_id* from the parsed
148 *message*.
150 >>> h.segment('PID') # doctest: +ELLIPSIS
151 [[u'PID'], ...]
153 :rtype: :py:class:`hl7.Segment`
154 """
155 # Get the list of all the segments and pull out the first one,
156 # if possible
157 match = self.segments(segment_id)
158 # We should never get an IndexError, since segments will instead
159 # throw an KeyError
160 return match[0]
162 def segments(self, segment_id):
163 """Returns the requested segments from the parsed *message* that are
164 identified by the *segment_id* (e.g. OBR, MSH, ORC, OBX).
166 >>> h.segments('OBX')
167 [[[u'OBX'], [u'1'], ...]]
169 :rtype: list of :py:class:`hl7.Segment`
170 """
171 # Compare segment_id to the very first string in each segment,
172 # returning all segments that match.
173 # Return as a Sequence so 1-based indexing can be used
174 matches = Sequence(segment for segment in self if segment[0][0] == segment_id)
175 if len(matches) == 0:
176 raise KeyError('No %s segments' % segment_id)
177 return matches
179 def extract_field(self, segment, segment_num=1, field_num=1, repeat_num=1, component_num=1, subcomponent_num=1):
180 """
181 Extract a field using a future proofed approach, based on rules in:
182 http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
184 'PID|Field1|Component1^Component2|Component1^Sub-Component1&Sub-Component2^Component3|Repeat1~Repeat2',
186 | PID.F3.R1.C2.S2 = 'Sub-Component2'
187 | PID.F4.R2.C1 = 'Repeat1'
189 Compatibility Rules:
191 If the parse tree is deeper than the specified path continue
192 following the first child branch until a leaf of the tree is
193 encountered and return that value (which could be blank).
195 Example:
197 | PID.F3.R1.C2 = 'Sub-Component1' (assume .SC1)
199 If the parse tree terminates before the full path is satisfied
200 check each of the subsequent paths and if every one is specified
201 at position 1 then the leaf value reached can be returned as the
202 result.
204 | PID.F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1)
205 """
206 # Save original values for error messages
207 accessor = Accessor(segment, segment_num, field_num, repeat_num, component_num, subcomponent_num)
209 field_num = field_num or 1
210 repeat_num = repeat_num or 1
211 component_num = component_num or 1
212 subcomponent_num = subcomponent_num or 1
214 segment = self.segments(segment)(segment_num)
215 if field_num < len(segment):
216 field = segment(field_num)
217 else:
218 if repeat_num == 1 and component_num == 1 and subcomponent_num == 1:
219 return '' # Assume non-present optional value
220 raise IndexError('Field not present: {0}'.format(accessor.key))
222 rep = field(repeat_num)
224 if not isinstance(rep, Repetition):
225 # leaf
226 if component_num == 1 and subcomponent_num == 1:
227 return self.unescape(rep)
228 raise IndexError('Field reaches leaf node before completing path: {0}'.format(accessor.key))
230 if component_num > len(rep):
231 if subcomponent_num == 1:
232 return '' # Assume non-present optional value
233 raise IndexError('Component not present: {0}'.format(accessor.key))
235 component = rep(component_num)
236 if not isinstance(component, Component):
237 # leaf
238 if subcomponent_num == 1:
239 return self.unescape(component)
240 raise IndexError('Field reaches leaf node before completing path: {0}'.format(accessor.key))
242 if subcomponent_num <= len(component):
243 subcomponent = component(subcomponent_num)
244 return self.unescape(subcomponent)
245 else:
246 return '' # Assume non-present optional value
248 def assign_field(self, value, segment, segment_num=1, field_num=None, repeat_num=None, component_num=None, subcomponent_num=None):
249 """
250 Assign a value into a message using the tree based assignment notation.
251 The segment must exist.
253 Extract a field using a future proofed approach, based on rules in:
254 http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing
255 """
256 segment = self.segments(segment)(segment_num)
258 while len(segment) <= field_num:
259 segment.append(self.create_field([]))
260 field = segment(field_num)
261 if repeat_num is None:
262 field[:] = [value]
263 return
264 while len(field) < repeat_num:
265 field.append(self.create_repetition([]))
266 repetition = field(repeat_num)
267 if component_num is None:
268 repetition[:] = [value]
269 return
270 while len(repetition) < component_num:
271 repetition.append(self.create_component([]))
272 component = repetition(component_num)
273 if subcomponent_num is None:
274 component[:] = [value]
275 return
276 while len(component) < subcomponent_num:
277 component.append('')
278 component(subcomponent_num, value)
280 def escape(self, field, app_map=None):
281 """
282 See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
284 To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
286 Pass through the message. Replace recognised characters with their escaped
287 version. Return an ascii encoded string.
289 Functionality:
291 * Replace separator characters (2.10.4)
292 * replace application defined characters (2.10.7)
293 * Replace non-ascii values with hex versions using HL7 conventions.
295 Incomplete:
297 * replace highlight characters (2.10.3)
298 * How to handle the rich text substitutions.
299 * Merge contiguous hex values
300 """
301 if not field:
302 return field
304 esc = str(self.esc)
306 DEFAULT_MAP = {
307 self.separators[1]: 'F', # 2.10.4
308 self.separators[2]: 'R',
309 self.separators[3]: 'S',
310 self.separators[4]: 'T',
311 self.esc: 'E',
312 '\r': '.br', # 2.10.6
313 }
315 rv = []
316 for offset, c in enumerate(field):
317 if app_map and c in app_map:
318 rv.append(esc + app_map[c] + esc)
319 elif c in DEFAULT_MAP:
320 rv.append(esc + DEFAULT_MAP[c] + esc)
321 elif ord(c) >= 0x20 and ord(c) <= 0x7E:
322 rv.append(c)
323 else:
324 rv.append('%sX%2x%s' % (esc, ord(c), esc))
326 return ''.join(rv)
328 def unescape(self, field, app_map=None):
329 """
330 See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/
332 To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known.
334 This will convert the identifiable sequences.
335 If the application provides mapping, these are also used.
336 Items which cannot be mapped are removed
338 For example, the App Map count provide N, H, Zxxx values
340 Chapter 2: Section 2.10
342 At the moment, this functionality can:
344 * replace the parsing characters (2.10.4)
345 * replace highlight characters (2.10.3)
346 * replace hex characters. (2.10.5)
347 * replace rich text characters (2.10.6)
348 * replace application defined characters (2.10.7)
350 It cannot:
352 * switch code pages / ISO IR character sets
353 """
354 if not field or field.find(self.esc) == -1:
355 return field
357 DEFAULT_MAP = {
358 'H': '_', # Override using the APP MAP: 2.10.3
359 'N': '_', # Override using the APP MAP
360 'F': self.separators[1], # 2.10.4
361 'R': self.separators[2],
362 'S': self.separators[3],
363 'T': self.separators[4],
364 'E': self.esc,
365 '.br': '\r', # 2.10.6
366 '.sp': '\r',
367 '.fi': '',
368 '.nf': '',
369 '.in': ' ',
370 '.ti': ' ',
371 '.sk': ' ',
372 '.ce': '\r',
373 }
375 rv = []
376 collecting = []
377 in_seq = False
378 for offset, c in enumerate(field):
379 if in_seq:
380 if c == self.esc:
381 in_seq = False
382 value = ''.join(collecting)
383 collecting = []
384 if not value:
385 logger.warn('Error unescaping value [%s], empty sequence found at %d', field, offset)
386 continue
387 if app_map and value in app_map:
388 rv.append(app_map[value])
389 elif value in DEFAULT_MAP:
390 rv.append(DEFAULT_MAP[value])
391 elif value.startswith('.') and ((app_map and value[:3] in app_map) or value[:3] in DEFAULT_MAP):
392 # Substitution with a number of repetitions defined (2.10.6)
393 if app_map and value[:3] in app_map:
394 ch = app_map[value[:3]]
395 else:
396 ch = DEFAULT_MAP[value[:3]]
397 count = int(value[3:])
398 rv.append(ch * count)
400 elif value[0] == 'C': # Convert to new Single Byte character set : 2.10.2
401 # Two HEX values, first value chooses the character set (ISO-IR), second gives the value
402 logger.warn('Error inline character sets [%s] not implemented, field [%s], offset [%s]', value, field, offset)
403 elif value[0] == 'M': # Switch to new Multi Byte character set : 2.10.2
404 # Three HEX values, first value chooses the character set (ISO-IR), rest give the value
405 logger.warn('Error inline character sets [%s] not implemented, field [%s], offset [%s]', value, field, offset)
406 elif value[0] == 'X': # Hex encoded Bytes: 2.10.5
407 value = value[1:]
408 try:
409 for off in range(0, len(value), 2):
410 rv.append(six.unichr(int(value[off:off + 2], 16)))
411 except:
412 logger.exception('Error decoding hex value [%s], field [%s], offset [%s]', value, field, offset)
413 else:
414 logger.exception('Error decoding value [%s], field [%s], offset [%s]', value, field, offset)
415 else:
416 collecting.append(c)
417 elif c == self.esc:
418 in_seq = True
419 else:
420 rv.append(six.text_type(c))
422 return ''.join(rv)
424 def create_message(self, seq):
425 """Create a new :py:class:`hl7.Message` compatible with this message"""
426 return self.factory.create_message(self.separators[0], seq, esc=self.esc, separators=self.separators, factory=self.factory)
428 def create_segment(self, seq):
429 """Create a new :py:class:`hl7.Segment` compatible with this message"""
430 return self.factory.create_segment(self.separators[1], seq, esc=self.esc, separators=self.separators[1:], factory=self.factory)
432 def create_field(self, seq):
433 """Create a new :py:class:`hl7.Field` compatible with this message"""
434 return self.factory.create_field(self.separators[2], seq, esc=self.esc, separators=self.separators[2:], factory=self.factory)
436 def create_repetition(self, seq):
437 """Create a new :py:class:`hl7.Repetition` compatible with this message"""
438 return self.factory.create_repetition(self.separators[3], seq, esc=self.esc, separators=self.separators[3:], factory=self.factory)
440 def create_component(self, seq):
441 """Create a new :py:class:`hl7.Component` compatible with this message"""
442 return self.factory.create_component(self.separators[4], seq, esc=self.esc, separators=self.separators[4:], factory=self.factory)
444 def create_ack(self, ack_code='AA', message_id=None, application=None, facility=None):
445 """
446 Create an hl7 ACK response :py:class:`hl7.Message`, per spec 2.9.2, for this message.
448 See http://www.hl7standards.com/blog/2007/02/01/ack-message-original-mode-acknowledgement/
450 ``ack_code`` options are one of `AA` (accept), `AR` (reject), `AE` (error)
451 (see HL7 Table 0008 - Acknowledgment Code)
452 ``message_id`` control message ID for ACK, defaults to unique generated ID
453 ``application`` name of sending application, defaults to receiving application of message
454 ``facility`` name of sending facility, defaults to receiving facility of message
455 """
456 source_msh = self.segment('MSH')
457 msh = self.create_segment([self.create_field(['MSH'])])
458 msa = self.create_segment([self.create_field(['MSA'])])
459 ack = self.create_message([msh, msa])
461 ack.assign_field(six.text_type(source_msh(1)), 'MSH', 1, 1)
462 ack.assign_field(six.text_type(source_msh(2)), 'MSH', 1, 2)
463 # Sending application is source receving application
464 ack.assign_field(six.text_type(application) if application is not None else six.text_type(source_msh(5)), 'MSH', 1, 3)
465 # Sending facility is source receving facility
466 ack.assign_field(six.text_type(facility) if facility is not None else six.text_type(source_msh(6)), 'MSH', 1, 4)
467 # Receiving application is source sending application
468 ack.assign_field(six.text_type(source_msh(3)), 'MSH', 1, 5)
469 # Receiving facility is source sending facility
470 ack.assign_field(six.text_type(source_msh(4)), 'MSH', 1, 6)
471 ack.assign_field(six.text_type(datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")), 'MSH', 1, 7)
472 # Message type code
473 ack.assign_field('ACK', 'MSH', 1, 9, 1, 1)
474 # Copy trigger event from source
475 ack.assign_field(six.text_type(source_msh(9)(1)(2)), 'MSH', 1, 9, 1, 2)
476 ack.assign_field(message_id if message_id is not None else generate_message_control_id(), 'MSH', 1, 10)
477 ack.assign_field(six.text_type(source_msh(11)), 'MSH', 1, 11)
478 ack.assign_field(six.text_type(source_msh(12)), 'MSH', 1, 12)
480 ack.assign_field(six.text_type(ack_code), 'MSA', 1, 1)
481 ack.assign_field(six.text_type(source_msh(10)), 'MSA', 1, 2)
483 return ack
486@python_2_unicode_compatible
487class Segment(Container):
488 """Second level of an HL7 message, which represents an HL7 Segment.
489 Traditionally this is a line of a message that ends with a carriage
490 return and is separated by pipes. It contains a list of
491 :py:class:`hl7.Field` instances.
492 """
493 def _adjust_index(self, index):
494 # First element is the segment name, so we don't need to adjust to get 1-based
495 return index
497 def __str__(self):
498 if six.text_type(self[0]) in ['MSH', 'FHS']:
499 return six.text_type(self[0]) + six.text_type(self[1]) + six.text_type(self[2]) + six.text_type(self[1]) + \
500 self.separator.join((six.text_type(x) for x in self[3:]))
501 return self.separator.join((six.text_type(x) for x in self))
504class Field(Container):
505 """Third level of an HL7 message, that traditionally is surrounded
506 by pipes and separated by carets. It contains a list of strings
507 or :py:class:`hl7.Repetition` instances.
508 """
511class Repetition(Container):
512 """Fourth level of an HL7 message. A field can repeat.
513 It contains a list of strings or :py:class:`hl7.Component` instances.
514 """
517class Component(Container):
518 """Fifth level of an HL7 message. A component is a composite datatypes.
519 It contains a list of string sub-components.
520 """
523class Factory(object):
524 """Factory used to create each type of Container.
526 A subclass can be used to create specialized subclasses of each container.
527 """
528 create_message = Message #: Create an instance of :py:class:`hl7.Message`
529 create_segment = Segment #: Create an instance of :py:class:`hl7.Segment`
530 create_field = Field #: Create an instance of :py:class:`hl7.Field`
531 create_repetition = Repetition #: Create an instance of :py:class:`hl7.Repetition`
532 create_component = Component #: Create an instance of :py:class:`hl7.Component`