Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import inspect 

2 

3from zope.interface import implementer, provider 

4 

5from pyramid.security import NO_PERMISSION_REQUIRED 

6from pyramid.csrf import check_csrf_origin, check_csrf_token 

7from pyramid.response import Response 

8 

9from pyramid.interfaces import ( 

10 IAuthenticationPolicy, 

11 IAuthorizationPolicy, 

12 IDefaultCSRFOptions, 

13 IDefaultPermission, 

14 IDebugLogger, 

15 IResponse, 

16 IViewMapper, 

17 IViewMapperFactory, 

18) 

19 

20from pyramid.compat import is_bound_method, is_unbound_method 

21 

22from pyramid.exceptions import ConfigurationError 

23from pyramid.httpexceptions import HTTPForbidden 

24from pyramid.util import object_description, takes_one_arg 

25from pyramid.view import render_view_to_response 

26from pyramid import renderers 

27 

28 

29def view_description(view): 

30 try: 

31 return view.__text__ 

32 except AttributeError: 

33 # custom view mappers might not add __text__ 

34 return object_description(view) 

35 

36 

37def requestonly(view, attr=None): 

38 return takes_one_arg(view, attr=attr, argname='request') 

39 

40 

41@implementer(IViewMapper) 

42@provider(IViewMapperFactory) 

43class DefaultViewMapper(object): 

44 def __init__(self, **kw): 

45 self.attr = kw.get('attr') 

46 

47 def __call__(self, view): 

48 if is_unbound_method(view) and self.attr is None: 

49 raise ConfigurationError( 

50 ( 

51 'Unbound method calls are not supported, please set the ' 

52 'class as your `view` and the method as your `attr`' 

53 ) 

54 ) 

55 

56 if inspect.isclass(view): 

57 view = self.map_class(view) 

58 else: 

59 view = self.map_nonclass(view) 

60 return view 

61 

62 def map_class(self, view): 

63 ronly = requestonly(view, self.attr) 

64 if ronly: 

65 mapped_view = self.map_class_requestonly(view) 

66 else: 

67 mapped_view = self.map_class_native(view) 

68 mapped_view.__text__ = 'method %s of %s' % ( 

69 self.attr or '__call__', 

70 object_description(view), 

71 ) 

72 return mapped_view 

73 

74 def map_nonclass(self, view): 

75 # We do more work here than appears necessary to avoid wrapping the 

76 # view unless it actually requires wrapping (to avoid function call 

77 # overhead). 

78 mapped_view = view 

79 ronly = requestonly(view, self.attr) 

80 if ronly: 

81 mapped_view = self.map_nonclass_requestonly(view) 

82 elif self.attr: 

83 mapped_view = self.map_nonclass_attr(view) 

84 if inspect.isroutine(mapped_view): 

85 # This branch will be true if the view is a function or a method. 

86 # We potentially mutate an unwrapped object here if it's a 

87 # function. We do this to avoid function call overhead of 

88 # injecting another wrapper. However, we must wrap if the 

89 # function is a bound method because we can't set attributes on a 

90 # bound method. 

91 if is_bound_method(view): 

92 _mapped_view = mapped_view 

93 

94 def mapped_view(context, request): 

95 return _mapped_view(context, request) 

96 

97 if self.attr is not None: 

98 mapped_view.__text__ = 'attr %s of %s' % ( 

99 self.attr, 

100 object_description(view), 

101 ) 

102 else: 

103 mapped_view.__text__ = object_description(view) 

104 return mapped_view 

105 

106 def map_class_requestonly(self, view): 

107 # its a class that has an __init__ which only accepts request 

108 attr = self.attr 

109 

110 def _class_requestonly_view(context, request): 

111 inst = view(request) 

112 request.__view__ = inst 

113 if attr is None: 

114 response = inst() 

115 else: 

116 response = getattr(inst, attr)() 

117 return response 

118 

119 return _class_requestonly_view 

120 

121 def map_class_native(self, view): 

122 # its a class that has an __init__ which accepts both context and 

123 # request 

124 attr = self.attr 

125 

126 def _class_view(context, request): 

127 inst = view(context, request) 

128 request.__view__ = inst 

129 if attr is None: 

130 response = inst() 

131 else: 

132 response = getattr(inst, attr)() 

133 return response 

134 

135 return _class_view 

136 

137 def map_nonclass_requestonly(self, view): 

138 # its a function that has a __call__ which accepts only a single 

139 # request argument 

140 attr = self.attr 

141 

142 def _requestonly_view(context, request): 

143 if attr is None: 

144 response = view(request) 

145 else: 

146 response = getattr(view, attr)(request) 

147 return response 

148 

149 return _requestonly_view 

150 

151 def map_nonclass_attr(self, view): 

152 # its a function that has a __call__ which accepts both context and 

153 # request, but still has an attr 

154 def _attr_view(context, request): 

155 response = getattr(view, self.attr)(context, request) 

156 return response 

157 

158 return _attr_view 

159 

160 

161def wraps_view(wrapper): 

162 def inner(view, info): 

163 wrapper_view = wrapper(view, info) 

164 return preserve_view_attrs(view, wrapper_view) 

165 

166 return inner 

167 

168 

169def preserve_view_attrs(view, wrapper): 

170 if view is None: 

171 return wrapper 

172 

173 if wrapper is view: 

174 return view 

175 

176 original_view = getattr(view, '__original_view__', None) 

177 

178 if original_view is None: 

179 original_view = view 

180 

181 wrapper.__wraps__ = view 

182 wrapper.__original_view__ = original_view 

183 wrapper.__module__ = view.__module__ 

184 wrapper.__doc__ = view.__doc__ 

185 

186 try: 

187 wrapper.__name__ = view.__name__ 

188 except AttributeError: 

189 wrapper.__name__ = repr(view) 

190 

191 # attrs that may not exist on "view", but, if so, must be attached to 

192 # "wrapped view" 

193 for attr in ( 

194 '__permitted__', 

195 '__call_permissive__', 

196 '__permission__', 

197 '__predicated__', 

198 '__predicates__', 

199 '__accept__', 

200 '__order__', 

201 '__text__', 

202 ): 

203 try: 

204 setattr(wrapper, attr, getattr(view, attr)) 

205 except AttributeError: 

206 pass 

207 

208 return wrapper 

209 

210 

211def mapped_view(view, info): 

212 mapper = info.options.get('mapper') 

213 if mapper is None: 

214 mapper = getattr(view, '__view_mapper__', None) 

215 if mapper is None: 

216 mapper = info.registry.queryUtility(IViewMapperFactory) 

217 if mapper is None: 

218 mapper = DefaultViewMapper 

219 

220 mapped_view = mapper(**info.options)(view) 

221 return mapped_view 

222 

223 

224mapped_view.options = ('mapper', 'attr') 

225 

226 

227def owrapped_view(view, info): 

228 wrapper_viewname = info.options.get('wrapper') 

229 viewname = info.options.get('name') 

230 if not wrapper_viewname: 

231 return view 

232 

233 def _owrapped_view(context, request): 

234 response = view(context, request) 

235 request.wrapped_response = response 

236 request.wrapped_body = response.body 

237 request.wrapped_view = view 

238 wrapped_response = render_view_to_response( 

239 context, request, wrapper_viewname 

240 ) 

241 if wrapped_response is None: 

242 raise ValueError( 

243 'No wrapper view named %r found when executing view ' 

244 'named %r' % (wrapper_viewname, viewname) 

245 ) 

246 return wrapped_response 

247 

248 return _owrapped_view 

249 

250 

251owrapped_view.options = ('name', 'wrapper') 

252 

253 

254def http_cached_view(view, info): 

255 if info.settings.get('prevent_http_cache', False): 

256 return view 

257 

258 seconds = info.options.get('http_cache') 

259 

260 if seconds is None: 

261 return view 

262 

263 options = {} 

264 

265 if isinstance(seconds, (tuple, list)): 

266 try: 

267 seconds, options = seconds 

268 except ValueError: 

269 raise ConfigurationError( 

270 'If http_cache parameter is a tuple or list, it must be ' 

271 'in the form (seconds, options); not %s' % (seconds,) 

272 ) 

273 

274 def wrapper(context, request): 

275 response = view(context, request) 

276 prevent_caching = getattr( 

277 response.cache_control, 'prevent_auto', False 

278 ) 

279 if not prevent_caching: 

280 response.cache_expires(seconds, **options) 

281 return response 

282 

283 return wrapper 

284 

285 

286http_cached_view.options = ('http_cache',) 

287 

288 

289def secured_view(view, info): 

290 for wrapper in (_secured_view, _authdebug_view): 

291 view = wraps_view(wrapper)(view, info) 

292 return view 

293 

294 

295secured_view.options = ('permission',) 

296 

297 

298def _secured_view(view, info): 

299 permission = explicit_val = info.options.get('permission') 

300 if permission is None: 

301 permission = info.registry.queryUtility(IDefaultPermission) 

302 if permission == NO_PERMISSION_REQUIRED: 

303 # allow views registered within configurations that have a 

304 # default permission to explicitly override the default 

305 # permission, replacing it with no permission at all 

306 permission = None 

307 

308 wrapped_view = view 

309 authn_policy = info.registry.queryUtility(IAuthenticationPolicy) 

310 authz_policy = info.registry.queryUtility(IAuthorizationPolicy) 

311 

312 # no-op on exception-only views without an explicit permission 

313 if explicit_val is None and info.exception_only: 

314 return view 

315 

316 if authn_policy and authz_policy and (permission is not None): 

317 

318 def permitted(context, request): 

319 principals = authn_policy.effective_principals(request) 

320 return authz_policy.permits(context, principals, permission) 

321 

322 def secured_view(context, request): 

323 result = permitted(context, request) 

324 if result: 

325 return view(context, request) 

326 view_name = getattr(view, '__name__', view) 

327 msg = getattr( 

328 request, 

329 'authdebug_message', 

330 'Unauthorized: %s failed permission check' % view_name, 

331 ) 

332 raise HTTPForbidden(msg, result=result) 

333 

334 wrapped_view = secured_view 

335 wrapped_view.__call_permissive__ = view 

336 wrapped_view.__permitted__ = permitted 

337 wrapped_view.__permission__ = permission 

338 

339 return wrapped_view 

340 

341 

342def _authdebug_view(view, info): 

343 wrapped_view = view 

344 settings = info.settings 

345 permission = explicit_val = info.options.get('permission') 

346 if permission is None: 

347 permission = info.registry.queryUtility(IDefaultPermission) 

348 authn_policy = info.registry.queryUtility(IAuthenticationPolicy) 

349 authz_policy = info.registry.queryUtility(IAuthorizationPolicy) 

350 logger = info.registry.queryUtility(IDebugLogger) 

351 

352 # no-op on exception-only views without an explicit permission 

353 if explicit_val is None and info.exception_only: 

354 return view 

355 

356 if settings and settings.get('debug_authorization', False): 

357 

358 def authdebug_view(context, request): 

359 view_name = getattr(request, 'view_name', None) 

360 

361 if authn_policy and authz_policy: 

362 if permission is NO_PERMISSION_REQUIRED: 

363 msg = 'Allowed (NO_PERMISSION_REQUIRED)' 

364 elif permission is None: 

365 msg = 'Allowed (no permission registered)' 

366 else: 

367 principals = authn_policy.effective_principals(request) 

368 msg = str( 

369 authz_policy.permits(context, principals, permission) 

370 ) 

371 else: 

372 msg = 'Allowed (no authorization policy in use)' 

373 

374 view_name = getattr(request, 'view_name', None) 

375 url = getattr(request, 'url', None) 

376 msg = ( 

377 'debug_authorization of url %s (view name %r against ' 

378 'context %r): %s' % (url, view_name, context, msg) 

379 ) 

380 if logger: 

381 logger.debug(msg) 

382 if request is not None: 

383 request.authdebug_message = msg 

384 return view(context, request) 

385 

386 wrapped_view = authdebug_view 

387 

388 return wrapped_view 

389 

390 

391def rendered_view(view, info): 

392 # one way or another this wrapper must produce a Response (unless 

393 # the renderer is a NullRendererHelper) 

394 renderer = info.options.get('renderer') 

395 if renderer is None: 

396 # register a default renderer if you want super-dynamic 

397 # rendering. registering a default renderer will also allow 

398 # override_renderer to work if a renderer is left unspecified for 

399 # a view registration. 

400 def viewresult_to_response(context, request): 

401 result = view(context, request) 

402 if result.__class__ is Response: # common case 

403 response = result 

404 else: 

405 response = info.registry.queryAdapterOrSelf(result, IResponse) 

406 if response is None: 

407 if result is None: 

408 append = ( 

409 ' You may have forgotten to return a value ' 

410 'from the view callable.' 

411 ) 

412 elif isinstance(result, dict): 

413 append = ( 

414 ' You may have forgotten to define a ' 

415 'renderer in the view configuration.' 

416 ) 

417 else: 

418 append = '' 

419 

420 msg = ( 

421 'Could not convert return value of the view ' 

422 'callable %s into a response object. ' 

423 'The value returned was %r.' + append 

424 ) 

425 

426 raise ValueError(msg % (view_description(view), result)) 

427 

428 return response 

429 

430 return viewresult_to_response 

431 

432 if renderer is renderers.null_renderer: 

433 return view 

434 

435 def rendered_view(context, request): 

436 result = view(context, request) 

437 if result.__class__ is Response: # potential common case 

438 response = result 

439 else: 

440 # this must adapt, it can't do a simple interface check 

441 # (avoid trying to render webob responses) 

442 response = info.registry.queryAdapterOrSelf(result, IResponse) 

443 if response is None: 

444 attrs = getattr(request, '__dict__', {}) 

445 if 'override_renderer' in attrs: 

446 # renderer overridden by newrequest event or other 

447 renderer_name = attrs.pop('override_renderer') 

448 view_renderer = renderers.RendererHelper( 

449 name=renderer_name, 

450 package=info.package, 

451 registry=info.registry, 

452 ) 

453 else: 

454 view_renderer = renderer.clone() 

455 if '__view__' in attrs: 

456 view_inst = attrs.pop('__view__') 

457 else: 

458 view_inst = getattr(view, '__original_view__', view) 

459 response = view_renderer.render_view( 

460 request, result, view_inst, context 

461 ) 

462 return response 

463 

464 return rendered_view 

465 

466 

467rendered_view.options = ('renderer',) 

468 

469 

470def decorated_view(view, info): 

471 decorator = info.options.get('decorator') 

472 if decorator is None: 

473 return view 

474 return decorator(view) 

475 

476 

477decorated_view.options = ('decorator',) 

478 

479 

480def csrf_view(view, info): 

481 explicit_val = info.options.get('require_csrf') 

482 defaults = info.registry.queryUtility(IDefaultCSRFOptions) 

483 if defaults is None: 

484 default_val = False 

485 token = 'csrf_token' 

486 header = 'X-CSRF-Token' 

487 safe_methods = frozenset(["GET", "HEAD", "OPTIONS", "TRACE"]) 

488 callback = None 

489 else: 

490 default_val = defaults.require_csrf 

491 token = defaults.token 

492 header = defaults.header 

493 safe_methods = defaults.safe_methods 

494 callback = defaults.callback 

495 

496 enabled = ( 

497 explicit_val is True 

498 or 

499 # fallback to the default val if not explicitly enabled 

500 # but only if the view is not an exception view 

501 (explicit_val is not False and default_val and not info.exception_only) 

502 ) 

503 # disable if both header and token are disabled 

504 enabled = enabled and (token or header) 

505 wrapped_view = view 

506 if enabled: 

507 

508 def csrf_view(context, request): 

509 if request.method not in safe_methods and ( 

510 callback is None or callback(request) 

511 ): 

512 check_csrf_origin(request, raises=True) 

513 check_csrf_token(request, token, header, raises=True) 

514 return view(context, request) 

515 

516 wrapped_view = csrf_view 

517 return wrapped_view 

518 

519 

520csrf_view.options = ('require_csrf',) 

521 

522VIEW = 'VIEW' 

523INGRESS = 'INGRESS'