Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3import uuid 

4import logging 

5try: # Python 2.x 

6 import urlparse 

7 from urllib import urlencode 

8except Exception as e: # Python 3 

9 import urllib.parse as urlparse 

10 from urllib.parse import urlencode 

11 

12logger = logging.getLogger(__name__) 

13 

14class FHIRAuth(object): 

15 """ Superclass to handle authorization flow and state. 

16 """ 

17 auth_type = 'none' 

18 auth_classes = {} 

19 

20 @classmethod 

21 def register(cls): 

22 """ Register this class to handle authorization types of the given 

23 type. """ 

24 if not cls.auth_type: 

25 raise Exception('Class {0} does not specify the auth_type it supports'.format(cls)) 

26 if cls.auth_type not in FHIRAuth.auth_classes: 

27 FHIRAuth.auth_classes[cls.auth_type] = cls 

28 elif FHIRAuth.auth_classes[cls.auth_type] != cls: 

29 raise Exception('Class {0} is already registered for authorization type "{1}"'.format(FHIRAuth.auth_classes[cls.auth_type], cls.auth_type)) 

30 

31 @classmethod 

32 def from_capability_security(cls, security, state=None): 

33 """ Supply a capabilitystatement.rest.security statement and this 

34 method will figure out which type of security should be instantiated. 

35  

36 :param security: A CapabilityStatementRestSecurity instance 

37 :param state: A settings/state dictionary 

38 :returns: A FHIRAuth instance or subclass thereof 

39 """ 

40 auth_type = None 

41 

42 # look for OAuth2 URLs in SMART security extensions 

43 if security is not None and security.extension is not None: 

44 for e in security.extension: 

45 if "http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris" == e.url: 

46 if e.extension is not None: 

47 for ee in e.extension: 

48 if 'token' == ee.url: 

49 state['token_uri'] = ee.valueUri 

50 elif 'authorize' == ee.url: 

51 state['authorize_uri'] = ee.valueUri 

52 auth_type = 'oauth2' 

53 elif 'register' == ee.url: 

54 state['registration_uri'] = ee.valueUri 

55 break 

56 else: 

57 logger.warning("SMART AUTH: invalid `http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris` extension: needs to include sub-extensions to define OAuth2 endpoints but there are none") 

58 

59 # fallback to old extension URLs  

60 elif "http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris#register" == e.url: 

61 state['registration_uri'] = e.valueUri 

62 elif "http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris#authorize" == e.url: 

63 state['authorize_uri'] = e.valueUri 

64 auth_type = 'oauth2' 

65 elif "http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris#token" == e.url: 

66 state['token_uri'] = e.valueUri 

67 

68 return cls.create(auth_type, state=state) 

69 

70 @classmethod 

71 def create(cls, auth_type, state=None): 

72 """ Factory method to create the correct subclass for the given 

73 authorization type. """ 

74 if not auth_type: 

75 auth_type = 'none' 

76 if auth_type in FHIRAuth.auth_classes: 

77 klass = FHIRAuth.auth_classes[auth_type] 

78 return klass(state=state) 

79 raise Exception('No class registered for authorization type "{0}"'.format(auth_type)) 

80 

81 

82 def __init__(self, state=None): 

83 self.app_id = None 

84 if state is not None: 

85 self.from_state(state) 

86 

87 @property 

88 def ready(self): 

89 """ Indicates whether the authorization part is ready to make 

90 resource requests. """ 

91 return True 

92 

93 def reset(self): 

94 pass 

95 

96 def can_sign_headers(self): 

97 return False 

98 

99 def authorize_uri(self, server): 

100 """ Return the authorize URL to use, if any. """ 

101 return None 

102 

103 def handle_callback(self, url, server): 

104 """ Return the launch context. """ 

105 raise Exception("{0} cannot handle callback URL".format(self)) 

106 

107 def reauthorize(self): 

108 """ Perform a re-authorization of some form. 

109  

110 :returns: The launch context dictionary or None on failure 

111 """ 

112 return None 

113 

114 

115 # MARK: State 

116 

117 @property 

118 def state(self): 

119 return { 

120 'app_id': self.app_id, 

121 } 

122 

123 def from_state(self, state): 

124 """ Update ivars from given state information. 

125 """ 

126 assert state 

127 self.app_id = state.get('app_id') or self.app_id 

128 

129 

130class FHIROAuth2Auth(FHIRAuth): 

131 """ OAuth2 handling class for FHIR servers. 

132 """ 

133 auth_type = 'oauth2' 

134 

135 def __init__(self, state=None): 

136 self.aud = None 

137 self._registration_uri = None 

138 self._authorize_uri = None 

139 self._redirect_uri = None 

140 self._token_uri = None 

141 

142 self.auth_state = None 

143 self.app_secret = None 

144 self.access_token = None 

145 self.refresh_token = None 

146 

147 super(FHIROAuth2Auth, self).__init__(state=state) 

148 

149 @property 

150 def ready(self): 

151 return True if self.access_token else False 

152 

153 def reset(self): 

154 super(FHIROAuth2Auth, self).reset() 

155 self.access_token = None 

156 self.auth_state = None 

157 

158 

159 # MARK: Signing/Authorizing Request Headers 

160 

161 def can_sign_headers(self): 

162 return True if self.access_token is not None else False 

163 

164 def signed_headers(self, headers): 

165 """ Returns updated HTTP request headers, if possible, raises if there 

166 is no access_token. 

167 """ 

168 if not self.can_sign_headers(): 

169 raise Exception("Cannot sign headers since I have no access token") 

170 

171 if headers is None: 

172 headers = {} 

173 headers['Authorization'] = "Bearer {0}".format(self.access_token) 

174 

175 return headers 

176 

177 

178 # MARK: OAuth2 Flow 

179 

180 def authorize_uri(self, server): 

181 """ The URL to authorize against. The `server` param is supplied so 

182 that the server can be informed of state changes that need to be 

183 stored. 

184 """ 

185 auth_params = self._authorize_params(server) 

186 logger.debug("SMART AUTH: Will use parameters for `authorize_uri`: {0}".format(auth_params)) 

187 

188 # the authorize uri may have params, make sure to not lose them 

189 parts = list(urlparse.urlsplit(self._authorize_uri)) 

190 if len(parts[3]) > 0: 

191 args = urlparse.parse_qs(parts[3]) 

192 args.update(auth_params) 

193 auth_params = args 

194 parts[3] = urlencode(auth_params, doseq=True) 

195 

196 return urlparse.urlunsplit(parts) 

197 

198 def _authorize_params(self, server): 

199 """ The URL parameters to use when requesting a token code. 

200 """ 

201 if server is None: 

202 raise Exception("Cannot create an authorize-uri without server instance") 

203 if self.auth_state is None: 

204 self.auth_state = str(uuid.uuid4())[:8] 

205 server.should_save_state() 

206 

207 params = { 

208 'response_type': 'code', 

209 'client_id': self.app_id, 

210 'redirect_uri': self._redirect_uri, 

211 'scope': server.desired_scope, 

212 'aud': self.aud, 

213 'state': self.auth_state, 

214 } 

215 if server.launch_token is not None: 

216 params['launch'] = server.launch_token 

217 return params 

218 

219 def handle_callback(self, url, server): 

220 """ Verify OAuth2 callback URL and exchange the code, if everything 

221 goes well, for an access token. 

222  

223 :param str url: The callback/redirect URL to handle 

224 :param server: The Server instance to use 

225 :returns: The launch context dictionary 

226 """ 

227 logger.debug("SMART AUTH: Handling callback URL") 

228 if url is None: 

229 raise Exception("No callback URL received") 

230 try: 

231 args = dict(urlparse.parse_qsl(urlparse.urlsplit(url)[3])) 

232 except Exception as e: 

233 raise Exception("Invalid callback URL: {0}".format(e)) 

234 

235 # verify response 

236 err = self.extract_oauth_error(args) 

237 if err is not None: 

238 raise Exception(err) 

239 

240 stt = args.get('state') 

241 if stt is None or self.auth_state != stt: 

242 raise Exception("Invalid state, will not use this code. Have: {0}, want: {1}".format(stt, self.auth_state)) 

243 

244 code = args.get('code') 

245 if code is None: 

246 raise Exception("Did not receive a code, only have: {0}".format(', '.join(args.keys()))) 

247 

248 # exchange code for token 

249 exchange = self._code_exchange_params(code) 

250 return self._request_access_token(server, exchange) 

251 

252 def _code_exchange_params(self, code): 

253 """ These parameters are used by to exchange the given code for an 

254 access token. 

255 """ 

256 return { 

257 'client_id': self.app_id, 

258 'code': code, 

259 'grant_type': 'authorization_code', 

260 'redirect_uri': self._redirect_uri, 

261 'state': self.auth_state, 

262 } 

263 

264 def _request_access_token(self, server, params): 

265 """ Requests an access token from the instance's server via a form POST 

266 request, remembers the token (and patient id if there is one) or 

267 raises an Exception. 

268  

269 :returns: A dictionary with launch params 

270 """ 

271 if server is None: 

272 raise Exception("I need a server to request an access token") 

273 

274 logger.debug("SMART AUTH: Requesting access token from {0}".format(self._token_uri)) 

275 auth = None 

276 if self.app_secret: 

277 auth = (self.app_id, self.app_secret) 

278 ret_params = server.post_as_form(self._token_uri, params, auth).json() 

279 

280 self.access_token = ret_params.get('access_token') 

281 if self.access_token is None: 

282 raise Exception("No access token received") 

283 del ret_params['access_token'] 

284 

285 if 'expires_in' in ret_params: 

286 del ret_params['expires_in'] 

287 

288 # The refresh token issued by the authorization server. If present, the 

289 # app should discard any previous refresh_token associated with this 

290 # launch, replacing it with this new value. 

291 refresh_token = ret_params.get('refresh_token') 

292 if refresh_token is not None: 

293 self.refresh_token = refresh_token 

294 del ret_params['refresh_token'] 

295 

296 logger.debug("SMART AUTH: Received access token: {0}, refresh token: {1}" 

297 .format(self.access_token is not None, self.refresh_token is not None)) 

298 return ret_params 

299 

300 

301 # MARK: Reauthorization 

302 

303 def reauthorize(self, server): 

304 """ Perform reauthorization. 

305  

306 :param server: The Server instance to use 

307 :returns: The launch context dictionary, or None on failure 

308 """ 

309 if self.refresh_token is None: 

310 logger.debug("SMART AUTH: Cannot reauthorize without refresh token") 

311 return None 

312 

313 logger.debug("SMART AUTH: Refreshing token") 

314 reauth = self._reauthorize_params() 

315 return self._request_access_token(server, reauth) 

316 

317 def _reauthorize_params(self): 

318 """ Parameters to be used in a reauthorize request. 

319 """ 

320 if self.refresh_token is None: 

321 raise Exception("Cannot produce reauthorize parameters without refresh token") 

322 return { 

323 'client_id': self.app_id, 

324 #'client_secret': None, # we don't use it 

325 'grant_type': 'refresh_token', 

326 'refresh_token': self.refresh_token, 

327 } 

328 

329 

330 # MARK: State 

331 

332 @property 

333 def state(self): 

334 s = super(FHIROAuth2Auth, self).state 

335 s['aud'] = self.aud 

336 s['registration_uri'] = self._registration_uri 

337 s['authorize_uri'] = self._authorize_uri 

338 s['redirect_uri'] = self._redirect_uri 

339 s['token_uri'] = self._token_uri 

340 if self.auth_state is not None: 

341 s['auth_state'] = self.auth_state 

342 if self.app_secret is not None: 

343 s['app_secret'] = self.app_secret 

344 if self.access_token is not None: 

345 s['access_token'] = self.access_token 

346 if self.refresh_token is not None: 

347 s['refresh_token'] = self.refresh_token 

348 

349 return s 

350 

351 def from_state(self, state): 

352 """ Update ivars from given state information. 

353 """ 

354 super(FHIROAuth2Auth, self).from_state(state) 

355 self.aud = state.get('aud') or self.aud 

356 self._registration_uri = state.get('registration_uri') or self._registration_uri 

357 self._authorize_uri = state.get('authorize_uri') or self._authorize_uri 

358 self._redirect_uri = state.get('redirect_uri') or self._redirect_uri 

359 self._token_uri = state.get('token_uri') or self._token_uri 

360 self.auth_state = state.get('auth_state') or self.auth_state 

361 self.app_secret = state.get('app_secret') or self.app_secret 

362 

363 self.access_token = state.get('access_token') or self.access_token 

364 self.refresh_token = state.get('refresh_token') or self.refresh_token 

365 

366 

367 # MARK: Utilities  

368 

369 def extract_oauth_error(self, args): 

370 """ Check if an argument dictionary contains OAuth error information. 

371 """ 

372 # "error_description" is optional, we prefer it if it's present 

373 if 'error_description' in args: 

374 return args['error_description'].replace('+', ' ') 

375 

376 # the "error" response is required if there are errors, look for it 

377 if 'error' in args: 

378 err_code = args['error'] 

379 if 'invalid_request' == err_code: 

380 return "The request is missing a required parameter, includes an invalid parameter value, includes a parameter more than once, or is otherwise malformed." 

381 if 'unauthorized_client' == err_code: 

382 return "The client is not authorized to request an access token using this method." 

383 if 'access_denied' == err_code: 

384 return "The resource owner or authorization server denied the request." 

385 if 'unsupported_response_type' == err_code: 

386 return "The authorization server does not support obtaining an access token using this method." 

387 if 'invalid_scope' == err_code: 

388 return "The requested scope is invalid, unknown, or malformed." 

389 if 'server_error' == err_code: 

390 return "The authorization server encountered an unexpected condition that prevented it from fulfilling the request." 

391 if 'temporarily_unavailable' == err_code: 

392 return "The authorization server is currently unable to handle the request due to a temporary overloading or maintenance of the server." 

393 return "Authorization error: {0}.".format(err_code) 

394 

395 return None 

396 

397 

398# register classes 

399FHIRAuth.register() 

400FHIROAuth2Auth.register()