Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import uuid 

2 

3from webob.cookies import CookieProfile 

4from zope.interface import implementer 

5 

6 

7from pyramid.compat import bytes_, urlparse, text_ 

8from pyramid.exceptions import BadCSRFOrigin, BadCSRFToken 

9from pyramid.interfaces import ICSRFStoragePolicy 

10from pyramid.settings import aslist 

11from pyramid.util import SimpleSerializer, is_same_domain, strings_differ 

12 

13 

14@implementer(ICSRFStoragePolicy) 

15class LegacySessionCSRFStoragePolicy(object): 

16 """ A CSRF storage policy that defers control of CSRF storage to the 

17 session. 

18 

19 This policy maintains compatibility with legacy ISession implementations 

20 that know how to manage CSRF tokens themselves via 

21 ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``. 

22 

23 Note that using this CSRF implementation requires that 

24 a :term:`session factory` is configured. 

25 

26 .. versionadded:: 1.9 

27 

28 """ 

29 

30 def new_csrf_token(self, request): 

31 """ Sets a new CSRF token into the session and returns it. """ 

32 return request.session.new_csrf_token() 

33 

34 def get_csrf_token(self, request): 

35 """ Returns the currently active CSRF token from the session, 

36 generating a new one if needed.""" 

37 return request.session.get_csrf_token() 

38 

39 def check_csrf_token(self, request, supplied_token): 

40 """ Returns ``True`` if the ``supplied_token`` is valid.""" 

41 expected_token = self.get_csrf_token(request) 

42 return not strings_differ( 

43 bytes_(expected_token), bytes_(supplied_token) 

44 ) 

45 

46 

47@implementer(ICSRFStoragePolicy) 

48class SessionCSRFStoragePolicy(object): 

49 """ A CSRF storage policy that persists the CSRF token in the session. 

50 

51 Note that using this CSRF implementation requires that 

52 a :term:`session factory` is configured. 

53 

54 ``key`` 

55 

56 The session key where the CSRF token will be stored. 

57 Default: `_csrft_`. 

58 

59 .. versionadded:: 1.9 

60 

61 """ 

62 

63 _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) 

64 

65 def __init__(self, key='_csrft_'): 

66 self.key = key 

67 

68 def new_csrf_token(self, request): 

69 """ Sets a new CSRF token into the session and returns it. """ 

70 token = self._token_factory() 

71 request.session[self.key] = token 

72 return token 

73 

74 def get_csrf_token(self, request): 

75 """ Returns the currently active CSRF token from the session, 

76 generating a new one if needed.""" 

77 token = request.session.get(self.key, None) 

78 if not token: 

79 token = self.new_csrf_token(request) 

80 return token 

81 

82 def check_csrf_token(self, request, supplied_token): 

83 """ Returns ``True`` if the ``supplied_token`` is valid.""" 

84 expected_token = self.get_csrf_token(request) 

85 return not strings_differ( 

86 bytes_(expected_token), bytes_(supplied_token) 

87 ) 

88 

89 

90@implementer(ICSRFStoragePolicy) 

91class CookieCSRFStoragePolicy(object): 

92 """ An alternative CSRF implementation that stores its information in 

93 unauthenticated cookies, known as the 'Double Submit Cookie' method in the 

94 `OWASP CSRF guidelines <https://www.owasp.org/index.php/ 

95 Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet# 

96 Double_Submit_Cookie>`_. This gives some additional flexibility with 

97 regards to scaling as the tokens can be generated and verified by a 

98 front-end server. 

99 

100 .. versionadded:: 1.9 

101 

102 .. versionchanged: 1.10 

103 

104 Added the ``samesite`` option and made the default ``'Lax'``. 

105 

106 """ 

107 

108 _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) 

109 

110 def __init__( 

111 self, 

112 cookie_name='csrf_token', 

113 secure=False, 

114 httponly=False, 

115 domain=None, 

116 max_age=None, 

117 path='/', 

118 samesite='Lax', 

119 ): 

120 serializer = SimpleSerializer() 

121 self.cookie_profile = CookieProfile( 

122 cookie_name=cookie_name, 

123 secure=secure, 

124 max_age=max_age, 

125 httponly=httponly, 

126 path=path, 

127 domains=[domain], 

128 serializer=serializer, 

129 samesite=samesite, 

130 ) 

131 self.cookie_name = cookie_name 

132 

133 def new_csrf_token(self, request): 

134 """ Sets a new CSRF token into the request and returns it. """ 

135 token = self._token_factory() 

136 request.cookies[self.cookie_name] = token 

137 

138 def set_cookie(request, response): 

139 self.cookie_profile.set_cookies(response, token) 

140 

141 request.add_response_callback(set_cookie) 

142 return token 

143 

144 def get_csrf_token(self, request): 

145 """ Returns the currently active CSRF token by checking the cookies 

146 sent with the current request.""" 

147 bound_cookies = self.cookie_profile.bind(request) 

148 token = bound_cookies.get_value() 

149 if not token: 

150 token = self.new_csrf_token(request) 

151 return token 

152 

153 def check_csrf_token(self, request, supplied_token): 

154 """ Returns ``True`` if the ``supplied_token`` is valid.""" 

155 expected_token = self.get_csrf_token(request) 

156 return not strings_differ( 

157 bytes_(expected_token), bytes_(supplied_token) 

158 ) 

159 

160 

161def get_csrf_token(request): 

162 """ Get the currently active CSRF token for the request passed, generating 

163 a new one using ``new_csrf_token(request)`` if one does not exist. This 

164 calls the equivalent method in the chosen CSRF protection implementation. 

165 

166 .. versionadded :: 1.9 

167 

168 """ 

169 registry = request.registry 

170 csrf = registry.getUtility(ICSRFStoragePolicy) 

171 return csrf.get_csrf_token(request) 

172 

173 

174def new_csrf_token(request): 

175 """ Generate a new CSRF token for the request passed and persist it in an 

176 implementation defined manner. This calls the equivalent method in the 

177 chosen CSRF protection implementation. 

178 

179 .. versionadded :: 1.9 

180 

181 """ 

182 registry = request.registry 

183 csrf = registry.getUtility(ICSRFStoragePolicy) 

184 return csrf.new_csrf_token(request) 

185 

186 

187def check_csrf_token( 

188 request, token='csrf_token', header='X-CSRF-Token', raises=True 

189): 

190 """ Check the CSRF token returned by the 

191 :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the 

192 value in ``request.POST.get(token)`` (if a POST request) or 

193 ``request.headers.get(header)``. If a ``token`` keyword is not supplied to 

194 this function, the string ``csrf_token`` will be used to look up the token 

195 in ``request.POST``. If a ``header`` keyword is not supplied to this 

196 function, the string ``X-CSRF-Token`` will be used to look up the token in 

197 ``request.headers``. 

198 

199 If the value supplied by post or by header cannot be verified by the 

200 :class:`pyramid.interfaces.ICSRFStoragePolicy`, and ``raises`` is 

201 ``True``, this function will raise an 

202 :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ 

203 and ``raises`` is ``False``, this function will return ``False``. If the 

204 CSRF check is successful, this function will return ``True`` 

205 unconditionally. 

206 

207 See :ref:`auto_csrf_checking` for information about how to secure your 

208 application automatically against CSRF attacks. 

209 

210 .. versionadded:: 1.4a2 

211 

212 .. versionchanged:: 1.7a1 

213 A CSRF token passed in the query string of the request is no longer 

214 considered valid. It must be passed in either the request body or 

215 a header. 

216 

217 .. versionchanged:: 1.9 

218 Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` and updated 

219 to use the configured :class:`pyramid.interfaces.ICSRFStoragePolicy` to 

220 verify the CSRF token. 

221 

222 """ 

223 supplied_token = "" 

224 # We first check the headers for a csrf token, as that is significantly 

225 # cheaper than checking the POST body 

226 if header is not None: 

227 supplied_token = request.headers.get(header, "") 

228 

229 # If this is a POST/PUT/etc request, then we'll check the body to see if it 

230 # has a token. We explicitly use request.POST here because CSRF tokens 

231 # should never appear in an URL as doing so is a security issue. We also 

232 # explicitly check for request.POST here as we do not support sending form 

233 # encoded data over anything but a request.POST. 

234 if supplied_token == "" and token is not None: 

235 supplied_token = request.POST.get(token, "") 

236 

237 policy = request.registry.getUtility(ICSRFStoragePolicy) 

238 if not policy.check_csrf_token(request, text_(supplied_token)): 

239 if raises: 

240 raise BadCSRFToken('check_csrf_token(): Invalid token') 

241 return False 

242 return True 

243 

244 

245def check_csrf_origin(request, trusted_origins=None, raises=True): 

246 """ 

247 Check the ``Origin`` of the request to see if it is a cross site request or 

248 not. 

249 

250 If the value supplied by the ``Origin`` or ``Referer`` header isn't one of 

251 the trusted origins and ``raises`` is ``True``, this function will raise a 

252 :exc:`pyramid.exceptions.BadCSRFOrigin` exception, but if ``raises`` is 

253 ``False``, this function will return ``False`` instead. If the CSRF origin 

254 checks are successful this function will return ``True`` unconditionally. 

255 

256 Additional trusted origins may be added by passing a list of domain (and 

257 ports if non-standard like ``['example.com', 'dev.example.com:8080']``) in 

258 with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` 

259 (the default) this list of additional domains will be pulled from the 

260 ``pyramid.csrf_trusted_origins`` setting. 

261 

262 Note that this function will do nothing if ``request.scheme`` is not 

263 ``https``. 

264 

265 .. versionadded:: 1.7 

266 

267 .. versionchanged:: 1.9 

268 Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` 

269 

270 """ 

271 

272 def _fail(reason): 

273 if raises: 

274 raise BadCSRFOrigin(reason) 

275 else: 

276 return False 

277 

278 if request.scheme == "https": 

279 # Suppose user visits http://example.com/ 

280 # An active network attacker (man-in-the-middle, MITM) sends a 

281 # POST form that targets https://example.com/detonate-bomb/ and 

282 # submits it via JavaScript. 

283 # 

284 # The attacker will need to provide a CSRF cookie and token, but 

285 # that's no problem for a MITM when we cannot make any assumptions 

286 # about what kind of session storage is being used. So the MITM can 

287 # circumvent the CSRF protection. This is true for any HTTP connection, 

288 # but anyone using HTTPS expects better! For this reason, for 

289 # https://example.com/ we need additional protection that treats 

290 # http://example.com/ as completely untrusted. Under HTTPS, 

291 # Barth et al. found that the Referer header is missing for 

292 # same-domain requests in only about 0.2% of cases or less, so 

293 # we can use strict Referer checking. 

294 

295 # Determine the origin of this request 

296 origin = request.headers.get("Origin") 

297 if origin is None: 

298 origin = request.referrer 

299 

300 # Fail if we were not able to locate an origin at all 

301 if not origin: 

302 return _fail("Origin checking failed - no Origin or Referer.") 

303 

304 # Parse our origin so we we can extract the required information from 

305 # it. 

306 originp = urlparse.urlparse(origin) 

307 

308 # Ensure that our Referer is also secure. 

309 if originp.scheme != "https": 

310 return _fail( 

311 "Referer checking failed - Referer is insecure while host is " 

312 "secure." 

313 ) 

314 

315 # Determine which origins we trust, which by default will include the 

316 # current origin. 

317 if trusted_origins is None: 

318 trusted_origins = aslist( 

319 request.registry.settings.get( 

320 "pyramid.csrf_trusted_origins", [] 

321 ) 

322 ) 

323 

324 if request.host_port not in set(["80", "443"]): 

325 trusted_origins.append("{0.domain}:{0.host_port}".format(request)) 

326 else: 

327 trusted_origins.append(request.domain) 

328 

329 # Actually check to see if the request's origin matches any of our 

330 # trusted origins. 

331 if not any( 

332 is_same_domain(originp.netloc, host) for host in trusted_origins 

333 ): 

334 reason = ( 

335 "Referer checking failed - {0} does not match any trusted " 

336 "origins." 

337 ) 

338 return _fail(reason.format(origin)) 

339 

340 return True