Coverage for /home/martinb/workspace/client-py/fhirclient/models/fhirsearch.py : 30%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Create FHIR search params from NoSQL-like query structures.
5# 2014, SMART Health IT.
7import logging
9from . import fhirreference
11try:
12 from urllib import quote_plus
13except Exception as e:
14 from urllib.parse import quote_plus
16logger = logging.getLogger(__name__)
19class FHIRSearch(object):
20 """ Create a FHIR search from NoSQL-like query structures.
21 """
23 def __init__(self, resource_type, struct=None):
24 self.resource_type = resource_type
25 """ The resource type class. """
27 self.params = []
28 """ FHIRSearchParam instances. """
30 self.wants_expand = False
31 """ Used internally; whether or not `params` must be expanded first. """
33 self.includes = []
34 """ Used internally; stores list of included resources for the search. """
36 if struct is not None:
37 if dict != type(struct):
38 raise Exception("Must pass a Python dictionary, but got a {}".format(type(struct)))
39 self.wants_expand = True
40 for key, val in struct.items():
41 self.params.append(FHIRSearchParam(key, val))
44 # MARK: Execution
46 def construct(self):
47 """ Constructs the URL with query string from the receiver's params.
48 """
49 if self.resource_type is None:
50 raise Exception("Need resource_type set to construct a search query")
52 parts = []
53 if self.params is not None:
54 for param in self.params:
55 if self.wants_expand:
56 for expanded in param.handle():
57 parts.append(expanded.as_parameter())
58 else:
59 parts.append(param.as_parameter())
61 for reference_model, reference_field, reverse in self.includes:
62 key = '_revinclude' if reverse else '_include'
63 parameter = '{}={}:{}'.format(
64 key, reference_model.resource_type, reference_field
65 )
66 parts.append(parameter)
68 return '{}?{}'.format(self.resource_type.resource_type, '&'.join(parts))
70 def include(self, reference_field, reference_model=None, reverse=False):
71 """ Add a resource to be included in the search results. Includes will
72 fetch additional resources referred to by the search results, or
73 additional resources which themselves refer to the search results
74 (reverse include). Recursive or iterative includes are not supported.
75 Provides a fluent interface to allow method chaining.
77 To include Patient resources when searching Observations:
78 `s = FHIRSearch(Observation).include('subject')`
79 To include Observation resources when searching Patients:
80 `s = FHIRSearch(Patient).include('subject', Observation, reverse=True)`
82 :param reference_field: The name of the search parameter (must be
83 FHIRReference type)
84 :param reference_model: The type of the source resource from which the
85 join comes (only used for reverse includes)
86 :param reverse: Whether this is a reverse include
87 :returns: This FHIRSearch instance
88 """
90 if reference_model is None:
91 reference_model = self.resource_type
93 model_fields = {
94 name: typ
95 for name, _, typ, _, _, _
96 in reference_model().elementProperties()
97 }
99 if model_fields.get(reference_field) is not fhirreference.FHIRReference:
100 logging.warning(
101 '%s does not have a reference type element named %s',
102 reference_model.resource_type, reference_field
103 )
104 return self
106 if reference_model is not self.resource_type and not reverse:
107 logging.warning('Only reverse includes can have a different reference model')
108 reverse = True
110 self.includes.append((reference_model, reference_field, reverse))
111 return self
113 def perform(self, server):
114 """ Construct the search URL and execute it against the given server.
116 :param server: The server against which to perform the search
117 :returns: A Bundle resource
118 """
119 if server is None:
120 raise Exception("Need a server to perform search")
122 from . import bundle
123 res = server.request_json(self.construct())
124 bundle = bundle.Bundle(res)
125 bundle.origin_server = server
126 return bundle
128 def perform_resources(self, server):
129 """ Performs the search by calling `perform`, then extracts all Bundle
130 entries and returns a list of Resource instances.
132 :param server: The server against which to perform the search
133 :returns: A list of Resource instances
134 """
135 bundle = self.perform(server)
136 resources = []
137 if bundle is not None and bundle.entry is not None:
138 for entry in bundle.entry:
139 resources.append(entry.resource)
141 return resources
144class FHIRSearchParam(object):
145 """ Holds one search parameter.
147 The instance's `value` can either be a string value or a search construct
148 dictionary. In the latter case the class's `handle` method must be called
149 to arrive at search parameter instances that can be converted into a URL
150 query.
151 """
153 def __init__(self, name, value):
154 self.name = name
155 self.value = value
157 def copy(self):
158 clone = object.__new__(self.__class__)
159 clone.__dict__ = self.__dict__.copy()
160 return clone
162 def handle(self):
163 """ Parses the receiver's value and returns a list of FHIRSearchParam
164 instances. Needs only be called if the param needs to be handled, i.e.
165 its value is a query structure.
167 :returns: A list with one or more FHIRSearchParam instances, not
168 altering the receiver
169 """
170 handler = FHIRSearchParamHandler.handler_for(self.name)(None, self.value)
171 return handler.handle(self.copy())
173 def as_parameter(self):
174 """ Return a string that represents the reciever as "key=value".
175 """
176 return '{}={}'.format(self.name, quote_plus(self.value, safe=',<=>'))
179class FHIRSearchParamHandler(object):
180 handles = None
181 handlers = []
183 @classmethod
184 def announce_handler(cls, handler):
185 cls.handlers.append(handler)
187 @classmethod
188 def handler_for(cls, key):
189 for handler in cls.handlers:
190 if handler.can_handle(key):
191 return handler
192 return cls
194 @classmethod
195 def can_handle(cls, key):
196 if cls.handles is not None:
197 return key in cls.handles
198 return True # base class handles everything else, so be sure to test it last!
201 def __init__(self, key, value):
202 self.key = key
203 self.value = value
204 self.modifier = []
205 self.multiplier = []
207 def handle(self, param):
208 """ Applies all handlers to the given search parameter.
209 :returns: A list of one or more new `FHIRSearchParam` instances
210 """
211 self.prepare()
212 return self.expand(param)
214 def prepare(self, parent=None):
215 """ Creates sub-handlers as needed, then prepares the receiver.
216 """
217 if dict == type(self.value):
218 for key, val in self.value.items():
219 handler = FHIRSearchParamHandler.handler_for(key)(key, val)
220 handler.prepare(self)
222 if parent is not None:
223 parent.multiplier.append(self)
225 def expand(self, param):
226 """ Executes the receiver's modifier and multiplier on itself, applying
227 changes to the given search param instance.
229 :returns: A list of one or more FHIRSearchParam instances
230 """
231 for handler in self.modifier:
232 handler.expand(param)
234 self.apply(param)
236 # if we have multiplier, expand sequentially
237 if len(self.multiplier) > 0:
238 expanded = []
239 for handler in self.multiplier:
240 clone = param.copy()
241 expanded.extend(handler.expand(clone))
243 return expanded
245 # no multiplier, just return the passed-in paramater
246 return [param]
248 def apply(self, param):
249 if self.key is not None:
250 param.name = '{}.{}'.format(param.name, self.key)
251 if 0 == len(self.multiplier):
252 param.value = self.value
255class FHIRSearchParamModifierHandler(FHIRSearchParamHandler):
256 modifiers = {
257 '$asc': ':asc',
258 '$desc': ':desc',
259 '$exact': ':exact',
260 '$missing': ':missing',
261 '$null': ':missing',
262 '$text': ':text',
263 }
264 handles = modifiers.keys()
266 def apply(self, param):
267 if self.key not in self.__class__.modifiers:
268 raise Exception('Unknown modifier "{}" for "{}"'.format(self.key, param.name))
269 param.name += self.__class__.modifiers[self.key]
270 param.value = self.value
273class FHIRSearchParamOperatorHandler(FHIRSearchParamHandler):
274 operators = {
275 '$gt': '>',
276 '$lt': '<',
277 '$lte': '<=',
278 '$gte': '>=',
279 }
280 handles = operators.keys()
282 def apply(self, param):
283 if self.key not in self.__class__.operators:
284 raise Exception('Unknown operator "{}" for "{}"'.format(self.key, param.name))
285 param.value = self.__class__.operators[self.key] + self.value
288class FHIRSearchParamMultiHandler(FHIRSearchParamHandler):
289 handles = ['$and', '$or']
291 def prepare(self, parent):
292 if list != type(self.value):
293 raise Exception('Expecting a list argument for "{}" but got {}'.format(parent.key, self.value))
295 handlers = []
296 for val in self.value:
297 if dict == type(val):
298 for kkey, vval in val.items():
299 handlers.append(FHIRSearchParamHandler.handler_for(kkey)(kkey, vval))
300 else:
301 handlers.append(FHIRSearchParamHandler.handler_for(parent.key)(None, val))
303 if '$and' == self.key:
304 for handler in handlers:
305 handler.prepare(parent)
306 elif '$or' == self.key:
307 ors = [h.value for h in handlers]
308 handler = FHIRSearchParamHandler.handler_for(parent.key)(None, ','.join(ors))
309 handler.prepare(parent)
310 else:
311 raise Exception('I cannot handle "{}"'.format(self.key))
314class FHIRSearchParamTypeHandler(FHIRSearchParamHandler):
315 handles = ['$type']
317 def prepare(self, parent):
318 parent.modifier.append(self)
320 def apply(self, param):
321 param.name = '{}:{}'.format(param.name, self.value)
324# announce all handlers
325FHIRSearchParamHandler.announce_handler(FHIRSearchParamModifierHandler)
326FHIRSearchParamHandler.announce_handler(FHIRSearchParamOperatorHandler)
327FHIRSearchParamHandler.announce_handler(FHIRSearchParamMultiHandler)
328FHIRSearchParamHandler.announce_handler(FHIRSearchParamTypeHandler)