Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/pyramid/csrf.py : 31%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import uuid
3from webob.cookies import CookieProfile
4from zope.interface import implementer
7from pyramid.compat import bytes_, urlparse, text_
8from pyramid.exceptions import BadCSRFOrigin, BadCSRFToken
9from pyramid.interfaces import ICSRFStoragePolicy
10from pyramid.settings import aslist
11from pyramid.util import SimpleSerializer, is_same_domain, strings_differ
14@implementer(ICSRFStoragePolicy)
15class LegacySessionCSRFStoragePolicy(object):
16 """ A CSRF storage policy that defers control of CSRF storage to the
17 session.
19 This policy maintains compatibility with legacy ISession implementations
20 that know how to manage CSRF tokens themselves via
21 ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``.
23 Note that using this CSRF implementation requires that
24 a :term:`session factory` is configured.
26 .. versionadded:: 1.9
28 """
30 def new_csrf_token(self, request):
31 """ Sets a new CSRF token into the session and returns it. """
32 return request.session.new_csrf_token()
34 def get_csrf_token(self, request):
35 """ Returns the currently active CSRF token from the session,
36 generating a new one if needed."""
37 return request.session.get_csrf_token()
39 def check_csrf_token(self, request, supplied_token):
40 """ Returns ``True`` if the ``supplied_token`` is valid."""
41 expected_token = self.get_csrf_token(request)
42 return not strings_differ(
43 bytes_(expected_token), bytes_(supplied_token)
44 )
47@implementer(ICSRFStoragePolicy)
48class SessionCSRFStoragePolicy(object):
49 """ A CSRF storage policy that persists the CSRF token in the session.
51 Note that using this CSRF implementation requires that
52 a :term:`session factory` is configured.
54 ``key``
56 The session key where the CSRF token will be stored.
57 Default: `_csrft_`.
59 .. versionadded:: 1.9
61 """
63 _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex))
65 def __init__(self, key='_csrft_'):
66 self.key = key
68 def new_csrf_token(self, request):
69 """ Sets a new CSRF token into the session and returns it. """
70 token = self._token_factory()
71 request.session[self.key] = token
72 return token
74 def get_csrf_token(self, request):
75 """ Returns the currently active CSRF token from the session,
76 generating a new one if needed."""
77 token = request.session.get(self.key, None)
78 if not token:
79 token = self.new_csrf_token(request)
80 return token
82 def check_csrf_token(self, request, supplied_token):
83 """ Returns ``True`` if the ``supplied_token`` is valid."""
84 expected_token = self.get_csrf_token(request)
85 return not strings_differ(
86 bytes_(expected_token), bytes_(supplied_token)
87 )
90@implementer(ICSRFStoragePolicy)
91class CookieCSRFStoragePolicy(object):
92 """ An alternative CSRF implementation that stores its information in
93 unauthenticated cookies, known as the 'Double Submit Cookie' method in the
94 `OWASP CSRF guidelines <https://www.owasp.org/index.php/
95 Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet#
96 Double_Submit_Cookie>`_. This gives some additional flexibility with
97 regards to scaling as the tokens can be generated and verified by a
98 front-end server.
100 .. versionadded:: 1.9
102 .. versionchanged: 1.10
104 Added the ``samesite`` option and made the default ``'Lax'``.
106 """
108 _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex))
110 def __init__(
111 self,
112 cookie_name='csrf_token',
113 secure=False,
114 httponly=False,
115 domain=None,
116 max_age=None,
117 path='/',
118 samesite='Lax',
119 ):
120 serializer = SimpleSerializer()
121 self.cookie_profile = CookieProfile(
122 cookie_name=cookie_name,
123 secure=secure,
124 max_age=max_age,
125 httponly=httponly,
126 path=path,
127 domains=[domain],
128 serializer=serializer,
129 samesite=samesite,
130 )
131 self.cookie_name = cookie_name
133 def new_csrf_token(self, request):
134 """ Sets a new CSRF token into the request and returns it. """
135 token = self._token_factory()
136 request.cookies[self.cookie_name] = token
138 def set_cookie(request, response):
139 self.cookie_profile.set_cookies(response, token)
141 request.add_response_callback(set_cookie)
142 return token
144 def get_csrf_token(self, request):
145 """ Returns the currently active CSRF token by checking the cookies
146 sent with the current request."""
147 bound_cookies = self.cookie_profile.bind(request)
148 token = bound_cookies.get_value()
149 if not token:
150 token = self.new_csrf_token(request)
151 return token
153 def check_csrf_token(self, request, supplied_token):
154 """ Returns ``True`` if the ``supplied_token`` is valid."""
155 expected_token = self.get_csrf_token(request)
156 return not strings_differ(
157 bytes_(expected_token), bytes_(supplied_token)
158 )
161def get_csrf_token(request):
162 """ Get the currently active CSRF token for the request passed, generating
163 a new one using ``new_csrf_token(request)`` if one does not exist. This
164 calls the equivalent method in the chosen CSRF protection implementation.
166 .. versionadded :: 1.9
168 """
169 registry = request.registry
170 csrf = registry.getUtility(ICSRFStoragePolicy)
171 return csrf.get_csrf_token(request)
174def new_csrf_token(request):
175 """ Generate a new CSRF token for the request passed and persist it in an
176 implementation defined manner. This calls the equivalent method in the
177 chosen CSRF protection implementation.
179 .. versionadded :: 1.9
181 """
182 registry = request.registry
183 csrf = registry.getUtility(ICSRFStoragePolicy)
184 return csrf.new_csrf_token(request)
187def check_csrf_token(
188 request, token='csrf_token', header='X-CSRF-Token', raises=True
189):
190 """ Check the CSRF token returned by the
191 :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the
192 value in ``request.POST.get(token)`` (if a POST request) or
193 ``request.headers.get(header)``. If a ``token`` keyword is not supplied to
194 this function, the string ``csrf_token`` will be used to look up the token
195 in ``request.POST``. If a ``header`` keyword is not supplied to this
196 function, the string ``X-CSRF-Token`` will be used to look up the token in
197 ``request.headers``.
199 If the value supplied by post or by header cannot be verified by the
200 :class:`pyramid.interfaces.ICSRFStoragePolicy`, and ``raises`` is
201 ``True``, this function will raise an
202 :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ
203 and ``raises`` is ``False``, this function will return ``False``. If the
204 CSRF check is successful, this function will return ``True``
205 unconditionally.
207 See :ref:`auto_csrf_checking` for information about how to secure your
208 application automatically against CSRF attacks.
210 .. versionadded:: 1.4a2
212 .. versionchanged:: 1.7a1
213 A CSRF token passed in the query string of the request is no longer
214 considered valid. It must be passed in either the request body or
215 a header.
217 .. versionchanged:: 1.9
218 Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` and updated
219 to use the configured :class:`pyramid.interfaces.ICSRFStoragePolicy` to
220 verify the CSRF token.
222 """
223 supplied_token = ""
224 # We first check the headers for a csrf token, as that is significantly
225 # cheaper than checking the POST body
226 if header is not None:
227 supplied_token = request.headers.get(header, "")
229 # If this is a POST/PUT/etc request, then we'll check the body to see if it
230 # has a token. We explicitly use request.POST here because CSRF tokens
231 # should never appear in an URL as doing so is a security issue. We also
232 # explicitly check for request.POST here as we do not support sending form
233 # encoded data over anything but a request.POST.
234 if supplied_token == "" and token is not None:
235 supplied_token = request.POST.get(token, "")
237 policy = request.registry.getUtility(ICSRFStoragePolicy)
238 if not policy.check_csrf_token(request, text_(supplied_token)):
239 if raises:
240 raise BadCSRFToken('check_csrf_token(): Invalid token')
241 return False
242 return True
245def check_csrf_origin(request, trusted_origins=None, raises=True):
246 """
247 Check the ``Origin`` of the request to see if it is a cross site request or
248 not.
250 If the value supplied by the ``Origin`` or ``Referer`` header isn't one of
251 the trusted origins and ``raises`` is ``True``, this function will raise a
252 :exc:`pyramid.exceptions.BadCSRFOrigin` exception, but if ``raises`` is
253 ``False``, this function will return ``False`` instead. If the CSRF origin
254 checks are successful this function will return ``True`` unconditionally.
256 Additional trusted origins may be added by passing a list of domain (and
257 ports if non-standard like ``['example.com', 'dev.example.com:8080']``) in
258 with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None``
259 (the default) this list of additional domains will be pulled from the
260 ``pyramid.csrf_trusted_origins`` setting.
262 Note that this function will do nothing if ``request.scheme`` is not
263 ``https``.
265 .. versionadded:: 1.7
267 .. versionchanged:: 1.9
268 Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf`
270 """
272 def _fail(reason):
273 if raises:
274 raise BadCSRFOrigin(reason)
275 else:
276 return False
278 if request.scheme == "https":
279 # Suppose user visits http://example.com/
280 # An active network attacker (man-in-the-middle, MITM) sends a
281 # POST form that targets https://example.com/detonate-bomb/ and
282 # submits it via JavaScript.
283 #
284 # The attacker will need to provide a CSRF cookie and token, but
285 # that's no problem for a MITM when we cannot make any assumptions
286 # about what kind of session storage is being used. So the MITM can
287 # circumvent the CSRF protection. This is true for any HTTP connection,
288 # but anyone using HTTPS expects better! For this reason, for
289 # https://example.com/ we need additional protection that treats
290 # http://example.com/ as completely untrusted. Under HTTPS,
291 # Barth et al. found that the Referer header is missing for
292 # same-domain requests in only about 0.2% of cases or less, so
293 # we can use strict Referer checking.
295 # Determine the origin of this request
296 origin = request.headers.get("Origin")
297 if origin is None:
298 origin = request.referrer
300 # Fail if we were not able to locate an origin at all
301 if not origin:
302 return _fail("Origin checking failed - no Origin or Referer.")
304 # Parse our origin so we we can extract the required information from
305 # it.
306 originp = urlparse.urlparse(origin)
308 # Ensure that our Referer is also secure.
309 if originp.scheme != "https":
310 return _fail(
311 "Referer checking failed - Referer is insecure while host is "
312 "secure."
313 )
315 # Determine which origins we trust, which by default will include the
316 # current origin.
317 if trusted_origins is None:
318 trusted_origins = aslist(
319 request.registry.settings.get(
320 "pyramid.csrf_trusted_origins", []
321 )
322 )
324 if request.host_port not in set(["80", "443"]):
325 trusted_origins.append("{0.domain}:{0.host_port}".format(request))
326 else:
327 trusted_origins.append(request.domain)
329 # Actually check to see if the request's origin matches any of our
330 # trusted origins.
331 if not any(
332 is_same_domain(originp.netloc, host) for host in trusted_origins
333 ):
334 reason = (
335 "Referer checking failed - {0} does not match any trusted "
336 "origins."
337 )
338 return _fail(reason.format(origin))
340 return True