Coverage for /home/martinb/workspace/client-py/fhirclient/auth.py : 21%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3import uuid
4import logging
5try: # Python 2.x
6 import urlparse
7 from urllib import urlencode
8except Exception as e: # Python 3
9 import urllib.parse as urlparse
10 from urllib.parse import urlencode
12logger = logging.getLogger(__name__)
14class FHIRAuth(object):
15 """ Superclass to handle authorization flow and state.
16 """
17 auth_type = 'none'
18 auth_classes = {}
20 @classmethod
21 def register(cls):
22 """ Register this class to handle authorization types of the given
23 type. """
24 if not cls.auth_type:
25 raise Exception('Class {0} does not specify the auth_type it supports'.format(cls))
26 if cls.auth_type not in FHIRAuth.auth_classes:
27 FHIRAuth.auth_classes[cls.auth_type] = cls
28 elif FHIRAuth.auth_classes[cls.auth_type] != cls:
29 raise Exception('Class {0} is already registered for authorization type "{1}"'.format(FHIRAuth.auth_classes[cls.auth_type], cls.auth_type))
31 @classmethod
32 def from_capability_security(cls, security, state=None):
33 """ Supply a capabilitystatement.rest.security statement and this
34 method will figure out which type of security should be instantiated.
36 :param security: A CapabilityStatementRestSecurity instance
37 :param state: A settings/state dictionary
38 :returns: A FHIRAuth instance or subclass thereof
39 """
40 auth_type = None
42 # look for OAuth2 URLs in SMART security extensions
43 if security is not None and security.extension is not None:
44 for e in security.extension:
45 if "http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris" == e.url:
46 if e.extension is not None:
47 for ee in e.extension:
48 if 'token' == ee.url:
49 state['token_uri'] = ee.valueUri
50 elif 'authorize' == ee.url:
51 state['authorize_uri'] = ee.valueUri
52 auth_type = 'oauth2'
53 elif 'register' == ee.url:
54 state['registration_uri'] = ee.valueUri
55 break
56 else:
57 logger.warning("SMART AUTH: invalid `http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris` extension: needs to include sub-extensions to define OAuth2 endpoints but there are none")
59 # fallback to old extension URLs
60 elif "http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris#register" == e.url:
61 state['registration_uri'] = e.valueUri
62 elif "http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris#authorize" == e.url:
63 state['authorize_uri'] = e.valueUri
64 auth_type = 'oauth2'
65 elif "http://fhir-registry.smarthealthit.org/StructureDefinition/oauth-uris#token" == e.url:
66 state['token_uri'] = e.valueUri
68 return cls.create(auth_type, state=state)
70 @classmethod
71 def create(cls, auth_type, state=None):
72 """ Factory method to create the correct subclass for the given
73 authorization type. """
74 if not auth_type:
75 auth_type = 'none'
76 if auth_type in FHIRAuth.auth_classes:
77 klass = FHIRAuth.auth_classes[auth_type]
78 return klass(state=state)
79 raise Exception('No class registered for authorization type "{0}"'.format(auth_type))
82 def __init__(self, state=None):
83 self.app_id = None
84 if state is not None:
85 self.from_state(state)
87 @property
88 def ready(self):
89 """ Indicates whether the authorization part is ready to make
90 resource requests. """
91 return True
93 def reset(self):
94 pass
96 def can_sign_headers(self):
97 return False
99 def authorize_uri(self, server):
100 """ Return the authorize URL to use, if any. """
101 return None
103 def handle_callback(self, url, server):
104 """ Return the launch context. """
105 raise Exception("{0} cannot handle callback URL".format(self))
107 def reauthorize(self):
108 """ Perform a re-authorization of some form.
110 :returns: The launch context dictionary or None on failure
111 """
112 return None
115 # MARK: State
117 @property
118 def state(self):
119 return {
120 'app_id': self.app_id,
121 }
123 def from_state(self, state):
124 """ Update ivars from given state information.
125 """
126 assert state
127 self.app_id = state.get('app_id') or self.app_id
130class FHIROAuth2Auth(FHIRAuth):
131 """ OAuth2 handling class for FHIR servers.
132 """
133 auth_type = 'oauth2'
135 def __init__(self, state=None):
136 self.aud = None
137 self._registration_uri = None
138 self._authorize_uri = None
139 self._redirect_uri = None
140 self._token_uri = None
142 self.auth_state = None
143 self.app_secret = None
144 self.access_token = None
145 self.refresh_token = None
147 super(FHIROAuth2Auth, self).__init__(state=state)
149 @property
150 def ready(self):
151 return True if self.access_token else False
153 def reset(self):
154 super(FHIROAuth2Auth, self).reset()
155 self.access_token = None
156 self.auth_state = None
159 # MARK: Signing/Authorizing Request Headers
161 def can_sign_headers(self):
162 return True if self.access_token is not None else False
164 def signed_headers(self, headers):
165 """ Returns updated HTTP request headers, if possible, raises if there
166 is no access_token.
167 """
168 if not self.can_sign_headers():
169 raise Exception("Cannot sign headers since I have no access token")
171 if headers is None:
172 headers = {}
173 headers['Authorization'] = "Bearer {0}".format(self.access_token)
175 return headers
178 # MARK: OAuth2 Flow
180 def authorize_uri(self, server):
181 """ The URL to authorize against. The `server` param is supplied so
182 that the server can be informed of state changes that need to be
183 stored.
184 """
185 auth_params = self._authorize_params(server)
186 logger.debug("SMART AUTH: Will use parameters for `authorize_uri`: {0}".format(auth_params))
188 # the authorize uri may have params, make sure to not lose them
189 parts = list(urlparse.urlsplit(self._authorize_uri))
190 if len(parts[3]) > 0:
191 args = urlparse.parse_qs(parts[3])
192 args.update(auth_params)
193 auth_params = args
194 parts[3] = urlencode(auth_params, doseq=True)
196 return urlparse.urlunsplit(parts)
198 def _authorize_params(self, server):
199 """ The URL parameters to use when requesting a token code.
200 """
201 if server is None:
202 raise Exception("Cannot create an authorize-uri without server instance")
203 if self.auth_state is None:
204 self.auth_state = str(uuid.uuid4())[:8]
205 server.should_save_state()
207 params = {
208 'response_type': 'code',
209 'client_id': self.app_id,
210 'redirect_uri': self._redirect_uri,
211 'scope': server.desired_scope,
212 'aud': self.aud,
213 'state': self.auth_state,
214 }
215 if server.launch_token is not None:
216 params['launch'] = server.launch_token
217 return params
219 def handle_callback(self, url, server):
220 """ Verify OAuth2 callback URL and exchange the code, if everything
221 goes well, for an access token.
223 :param str url: The callback/redirect URL to handle
224 :param server: The Server instance to use
225 :returns: The launch context dictionary
226 """
227 logger.debug("SMART AUTH: Handling callback URL")
228 if url is None:
229 raise Exception("No callback URL received")
230 try:
231 args = dict(urlparse.parse_qsl(urlparse.urlsplit(url)[3]))
232 except Exception as e:
233 raise Exception("Invalid callback URL: {0}".format(e))
235 # verify response
236 err = self.extract_oauth_error(args)
237 if err is not None:
238 raise Exception(err)
240 stt = args.get('state')
241 if stt is None or self.auth_state != stt:
242 raise Exception("Invalid state, will not use this code. Have: {0}, want: {1}".format(stt, self.auth_state))
244 code = args.get('code')
245 if code is None:
246 raise Exception("Did not receive a code, only have: {0}".format(', '.join(args.keys())))
248 # exchange code for token
249 exchange = self._code_exchange_params(code)
250 return self._request_access_token(server, exchange)
252 def _code_exchange_params(self, code):
253 """ These parameters are used by to exchange the given code for an
254 access token.
255 """
256 return {
257 'client_id': self.app_id,
258 'code': code,
259 'grant_type': 'authorization_code',
260 'redirect_uri': self._redirect_uri,
261 'state': self.auth_state,
262 }
264 def _request_access_token(self, server, params):
265 """ Requests an access token from the instance's server via a form POST
266 request, remembers the token (and patient id if there is one) or
267 raises an Exception.
269 :returns: A dictionary with launch params
270 """
271 if server is None:
272 raise Exception("I need a server to request an access token")
274 logger.debug("SMART AUTH: Requesting access token from {0}".format(self._token_uri))
275 auth = None
276 if self.app_secret:
277 auth = (self.app_id, self.app_secret)
278 ret_params = server.post_as_form(self._token_uri, params, auth).json()
280 self.access_token = ret_params.get('access_token')
281 if self.access_token is None:
282 raise Exception("No access token received")
283 del ret_params['access_token']
285 if 'expires_in' in ret_params:
286 del ret_params['expires_in']
288 # The refresh token issued by the authorization server. If present, the
289 # app should discard any previous refresh_token associated with this
290 # launch, replacing it with this new value.
291 refresh_token = ret_params.get('refresh_token')
292 if refresh_token is not None:
293 self.refresh_token = refresh_token
294 del ret_params['refresh_token']
296 logger.debug("SMART AUTH: Received access token: {0}, refresh token: {1}"
297 .format(self.access_token is not None, self.refresh_token is not None))
298 return ret_params
301 # MARK: Reauthorization
303 def reauthorize(self, server):
304 """ Perform reauthorization.
306 :param server: The Server instance to use
307 :returns: The launch context dictionary, or None on failure
308 """
309 if self.refresh_token is None:
310 logger.debug("SMART AUTH: Cannot reauthorize without refresh token")
311 return None
313 logger.debug("SMART AUTH: Refreshing token")
314 reauth = self._reauthorize_params()
315 return self._request_access_token(server, reauth)
317 def _reauthorize_params(self):
318 """ Parameters to be used in a reauthorize request.
319 """
320 if self.refresh_token is None:
321 raise Exception("Cannot produce reauthorize parameters without refresh token")
322 return {
323 'client_id': self.app_id,
324 #'client_secret': None, # we don't use it
325 'grant_type': 'refresh_token',
326 'refresh_token': self.refresh_token,
327 }
330 # MARK: State
332 @property
333 def state(self):
334 s = super(FHIROAuth2Auth, self).state
335 s['aud'] = self.aud
336 s['registration_uri'] = self._registration_uri
337 s['authorize_uri'] = self._authorize_uri
338 s['redirect_uri'] = self._redirect_uri
339 s['token_uri'] = self._token_uri
340 if self.auth_state is not None:
341 s['auth_state'] = self.auth_state
342 if self.app_secret is not None:
343 s['app_secret'] = self.app_secret
344 if self.access_token is not None:
345 s['access_token'] = self.access_token
346 if self.refresh_token is not None:
347 s['refresh_token'] = self.refresh_token
349 return s
351 def from_state(self, state):
352 """ Update ivars from given state information.
353 """
354 super(FHIROAuth2Auth, self).from_state(state)
355 self.aud = state.get('aud') or self.aud
356 self._registration_uri = state.get('registration_uri') or self._registration_uri
357 self._authorize_uri = state.get('authorize_uri') or self._authorize_uri
358 self._redirect_uri = state.get('redirect_uri') or self._redirect_uri
359 self._token_uri = state.get('token_uri') or self._token_uri
360 self.auth_state = state.get('auth_state') or self.auth_state
361 self.app_secret = state.get('app_secret') or self.app_secret
363 self.access_token = state.get('access_token') or self.access_token
364 self.refresh_token = state.get('refresh_token') or self.refresh_token
367 # MARK: Utilities
369 def extract_oauth_error(self, args):
370 """ Check if an argument dictionary contains OAuth error information.
371 """
372 # "error_description" is optional, we prefer it if it's present
373 if 'error_description' in args:
374 return args['error_description'].replace('+', ' ')
376 # the "error" response is required if there are errors, look for it
377 if 'error' in args:
378 err_code = args['error']
379 if 'invalid_request' == err_code:
380 return "The request is missing a required parameter, includes an invalid parameter value, includes a parameter more than once, or is otherwise malformed."
381 if 'unauthorized_client' == err_code:
382 return "The client is not authorized to request an access token using this method."
383 if 'access_denied' == err_code:
384 return "The resource owner or authorization server denied the request."
385 if 'unsupported_response_type' == err_code:
386 return "The authorization server does not support obtaining an access token using this method."
387 if 'invalid_scope' == err_code:
388 return "The requested scope is invalid, unknown, or malformed."
389 if 'server_error' == err_code:
390 return "The authorization server encountered an unexpected condition that prevented it from fulfilling the request."
391 if 'temporarily_unavailable' == err_code:
392 return "The authorization server is currently unable to handle the request due to a temporary overloading or maintenance of the server."
393 return "Authorization error: {0}.".format(err_code)
395 return None
398# register classes
399FHIRAuth.register()
400FHIROAuth2Auth.register()