Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Create FHIR search params from NoSQL-like query structures. 

5# 2014, SMART Health IT. 

6 

7import logging 

8 

9from . import fhirreference 

10 

11try: 

12 from urllib import quote_plus 

13except Exception as e: 

14 from urllib.parse import quote_plus 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class FHIRSearch(object): 

20 """ Create a FHIR search from NoSQL-like query structures. 

21 """ 

22 

23 def __init__(self, resource_type, struct=None): 

24 self.resource_type = resource_type 

25 """ The resource type class. """ 

26 

27 self.params = [] 

28 """ FHIRSearchParam instances. """ 

29 

30 self.wants_expand = False 

31 """ Used internally; whether or not `params` must be expanded first. """ 

32 

33 self.includes = [] 

34 """ Used internally; stores list of included resources for the search. """ 

35 

36 if struct is not None: 

37 if dict != type(struct): 

38 raise Exception("Must pass a Python dictionary, but got a {}".format(type(struct))) 

39 self.wants_expand = True 

40 for key, val in struct.items(): 

41 self.params.append(FHIRSearchParam(key, val)) 

42 

43 

44 # MARK: Execution 

45 

46 def construct(self): 

47 """ Constructs the URL with query string from the receiver's params. 

48 """ 

49 if self.resource_type is None: 

50 raise Exception("Need resource_type set to construct a search query") 

51 

52 parts = [] 

53 if self.params is not None: 

54 for param in self.params: 

55 if self.wants_expand: 

56 for expanded in param.handle(): 

57 parts.append(expanded.as_parameter()) 

58 else: 

59 parts.append(param.as_parameter()) 

60 

61 for reference_model, reference_field, reverse in self.includes: 

62 key = '_revinclude' if reverse else '_include' 

63 parameter = '{}={}:{}'.format( 

64 key, reference_model.resource_type, reference_field 

65 ) 

66 parts.append(parameter) 

67 

68 return '{}?{}'.format(self.resource_type.resource_type, '&'.join(parts)) 

69 

70 def include(self, reference_field, reference_model=None, reverse=False): 

71 """ Add a resource to be included in the search results. Includes will 

72 fetch additional resources referred to by the search results, or 

73 additional resources which themselves refer to the search results 

74 (reverse include). Recursive or iterative includes are not supported. 

75 Provides a fluent interface to allow method chaining. 

76 

77 To include Patient resources when searching Observations: 

78 `s = FHIRSearch(Observation).include('subject')` 

79 To include Observation resources when searching Patients: 

80 `s = FHIRSearch(Patient).include('subject', Observation, reverse=True)` 

81 

82 :param reference_field: The name of the search parameter (must be 

83 FHIRReference type) 

84 :param reference_model: The type of the source resource from which the 

85 join comes (only used for reverse includes) 

86 :param reverse: Whether this is a reverse include 

87 :returns: This FHIRSearch instance 

88 """ 

89 

90 if reference_model is None: 

91 reference_model = self.resource_type 

92 

93 model_fields = { 

94 name: typ 

95 for name, _, typ, _, _, _ 

96 in reference_model().elementProperties() 

97 } 

98 

99 if model_fields.get(reference_field) is not fhirreference.FHIRReference: 

100 logging.warning( 

101 '%s does not have a reference type element named %s', 

102 reference_model.resource_type, reference_field 

103 ) 

104 return self 

105 

106 if reference_model is not self.resource_type and not reverse: 

107 logging.warning('Only reverse includes can have a different reference model') 

108 reverse = True 

109 

110 self.includes.append((reference_model, reference_field, reverse)) 

111 return self 

112 

113 def perform(self, server): 

114 """ Construct the search URL and execute it against the given server. 

115  

116 :param server: The server against which to perform the search 

117 :returns: A Bundle resource 

118 """ 

119 if server is None: 

120 raise Exception("Need a server to perform search") 

121 

122 from . import bundle 

123 res = server.request_json(self.construct()) 

124 bundle = bundle.Bundle(res) 

125 bundle.origin_server = server 

126 return bundle 

127 

128 def perform_resources(self, server): 

129 """ Performs the search by calling `perform`, then extracts all Bundle 

130 entries and returns a list of Resource instances. 

131  

132 :param server: The server against which to perform the search 

133 :returns: A list of Resource instances 

134 """ 

135 bundle = self.perform(server) 

136 resources = [] 

137 if bundle is not None and bundle.entry is not None: 

138 for entry in bundle.entry: 

139 resources.append(entry.resource) 

140 

141 return resources 

142 

143 

144class FHIRSearchParam(object): 

145 """ Holds one search parameter. 

146  

147 The instance's `value` can either be a string value or a search construct 

148 dictionary. In the latter case the class's `handle` method must be called 

149 to arrive at search parameter instances that can be converted into a URL 

150 query. 

151 """ 

152 

153 def __init__(self, name, value): 

154 self.name = name 

155 self.value = value 

156 

157 def copy(self): 

158 clone = object.__new__(self.__class__) 

159 clone.__dict__ = self.__dict__.copy() 

160 return clone 

161 

162 def handle(self): 

163 """ Parses the receiver's value and returns a list of FHIRSearchParam 

164 instances. Needs only be called if the param needs to be handled, i.e. 

165 its value is a query structure. 

166  

167 :returns: A list with one or more FHIRSearchParam instances, not 

168 altering the receiver 

169 """ 

170 handler = FHIRSearchParamHandler.handler_for(self.name)(None, self.value) 

171 return handler.handle(self.copy()) 

172 

173 def as_parameter(self): 

174 """ Return a string that represents the reciever as "key=value". 

175 """ 

176 return '{}={}'.format(self.name, quote_plus(self.value, safe=',<=>')) 

177 

178 

179class FHIRSearchParamHandler(object): 

180 handles = None 

181 handlers = [] 

182 

183 @classmethod 

184 def announce_handler(cls, handler): 

185 cls.handlers.append(handler) 

186 

187 @classmethod 

188 def handler_for(cls, key): 

189 for handler in cls.handlers: 

190 if handler.can_handle(key): 

191 return handler 

192 return cls 

193 

194 @classmethod 

195 def can_handle(cls, key): 

196 if cls.handles is not None: 

197 return key in cls.handles 

198 return True # base class handles everything else, so be sure to test it last! 

199 

200 

201 def __init__(self, key, value): 

202 self.key = key 

203 self.value = value 

204 self.modifier = [] 

205 self.multiplier = [] 

206 

207 def handle(self, param): 

208 """ Applies all handlers to the given search parameter. 

209 :returns: A list of one or more new `FHIRSearchParam` instances 

210 """ 

211 self.prepare() 

212 return self.expand(param) 

213 

214 def prepare(self, parent=None): 

215 """ Creates sub-handlers as needed, then prepares the receiver. 

216 """ 

217 if dict == type(self.value): 

218 for key, val in self.value.items(): 

219 handler = FHIRSearchParamHandler.handler_for(key)(key, val) 

220 handler.prepare(self) 

221 

222 if parent is not None: 

223 parent.multiplier.append(self) 

224 

225 def expand(self, param): 

226 """ Executes the receiver's modifier and multiplier on itself, applying 

227 changes to the given search param instance. 

228  

229 :returns: A list of one or more FHIRSearchParam instances 

230 """ 

231 for handler in self.modifier: 

232 handler.expand(param) 

233 

234 self.apply(param) 

235 

236 # if we have multiplier, expand sequentially 

237 if len(self.multiplier) > 0: 

238 expanded = [] 

239 for handler in self.multiplier: 

240 clone = param.copy() 

241 expanded.extend(handler.expand(clone)) 

242 

243 return expanded 

244 

245 # no multiplier, just return the passed-in paramater 

246 return [param] 

247 

248 def apply(self, param): 

249 if self.key is not None: 

250 param.name = '{}.{}'.format(param.name, self.key) 

251 if 0 == len(self.multiplier): 

252 param.value = self.value 

253 

254 

255class FHIRSearchParamModifierHandler(FHIRSearchParamHandler): 

256 modifiers = { 

257 '$asc': ':asc', 

258 '$desc': ':desc', 

259 '$exact': ':exact', 

260 '$missing': ':missing', 

261 '$null': ':missing', 

262 '$text': ':text', 

263 } 

264 handles = modifiers.keys() 

265 

266 def apply(self, param): 

267 if self.key not in self.__class__.modifiers: 

268 raise Exception('Unknown modifier "{}" for "{}"'.format(self.key, param.name)) 

269 param.name += self.__class__.modifiers[self.key] 

270 param.value = self.value 

271 

272 

273class FHIRSearchParamOperatorHandler(FHIRSearchParamHandler): 

274 operators = { 

275 '$gt': '>', 

276 '$lt': '<', 

277 '$lte': '<=', 

278 '$gte': '>=', 

279 } 

280 handles = operators.keys() 

281 

282 def apply(self, param): 

283 if self.key not in self.__class__.operators: 

284 raise Exception('Unknown operator "{}" for "{}"'.format(self.key, param.name)) 

285 param.value = self.__class__.operators[self.key] + self.value 

286 

287 

288class FHIRSearchParamMultiHandler(FHIRSearchParamHandler): 

289 handles = ['$and', '$or'] 

290 

291 def prepare(self, parent): 

292 if list != type(self.value): 

293 raise Exception('Expecting a list argument for "{}" but got {}'.format(parent.key, self.value)) 

294 

295 handlers = [] 

296 for val in self.value: 

297 if dict == type(val): 

298 for kkey, vval in val.items(): 

299 handlers.append(FHIRSearchParamHandler.handler_for(kkey)(kkey, vval)) 

300 else: 

301 handlers.append(FHIRSearchParamHandler.handler_for(parent.key)(None, val)) 

302 

303 if '$and' == self.key: 

304 for handler in handlers: 

305 handler.prepare(parent) 

306 elif '$or' == self.key: 

307 ors = [h.value for h in handlers] 

308 handler = FHIRSearchParamHandler.handler_for(parent.key)(None, ','.join(ors)) 

309 handler.prepare(parent) 

310 else: 

311 raise Exception('I cannot handle "{}"'.format(self.key)) 

312 

313 

314class FHIRSearchParamTypeHandler(FHIRSearchParamHandler): 

315 handles = ['$type'] 

316 

317 def prepare(self, parent): 

318 parent.modifier.append(self) 

319 

320 def apply(self, param): 

321 param.name = '{}:{}'.format(param.name, self.value) 

322 

323 

324# announce all handlers 

325FHIRSearchParamHandler.announce_handler(FHIRSearchParamModifierHandler) 

326FHIRSearchParamHandler.announce_handler(FHIRSearchParamOperatorHandler) 

327FHIRSearchParamHandler.announce_handler(FHIRSearchParamMultiHandler) 

328FHIRSearchParamHandler.announce_handler(FHIRSearchParamTypeHandler) 

329