Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/webob/descriptors.py : 41%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import re
3from datetime import (
4 date,
5 datetime,
6 )
8from collections import namedtuple
10from webob.byterange import (
11 ContentRange,
12 Range,
13 )
15from webob.compat import (
16 PY2,
17 text_type,
18 )
20from webob.datetime_utils import (
21 parse_date,
22 serialize_date,
23 )
25from webob.util import (
26 header_docstring,
27 warn_deprecation,
28 )
31CHARSET_RE = re.compile(r';\s*charset=([^;]*)', re.I)
32SCHEME_RE = re.compile(r'^[a-z]+:', re.I)
35_not_given = object()
37def environ_getter(key, default=_not_given, rfc_section=None):
38 if rfc_section:
39 doc = header_docstring(key, rfc_section)
40 else:
41 doc = "Gets and sets the ``%s`` key in the environment." % key
42 if default is _not_given:
43 def fget(req):
44 return req.environ[key]
45 def fset(req, val):
46 req.environ[key] = val
47 fdel = None
48 else:
49 def fget(req):
50 return req.environ.get(key, default)
51 def fset(req, val):
52 if val is None:
53 if key in req.environ:
54 del req.environ[key]
55 else:
56 req.environ[key] = val
57 def fdel(req):
58 del req.environ[key]
59 return property(fget, fset, fdel, doc=doc)
62def environ_decoder(key, default=_not_given, rfc_section=None,
63 encattr=None):
64 if rfc_section:
65 doc = header_docstring(key, rfc_section)
66 else:
67 doc = "Gets and sets the ``%s`` key in the environment." % key
68 if default is _not_given:
69 def fget(req):
70 return req.encget(key, encattr=encattr)
71 def fset(req, val):
72 return req.encset(key, val, encattr=encattr)
73 fdel = None
74 else:
75 def fget(req):
76 return req.encget(key, default, encattr=encattr)
77 def fset(req, val):
78 if val is None:
79 if key in req.environ:
80 del req.environ[key]
81 else:
82 return req.encset(key, val, encattr=encattr)
83 def fdel(req):
84 del req.environ[key]
85 return property(fget, fset, fdel, doc=doc)
87def upath_property(key):
88 if PY2:
89 def fget(req):
90 encoding = req.url_encoding
91 return req.environ.get(key, '').decode(encoding)
92 def fset(req, val):
93 encoding = req.url_encoding
94 if isinstance(val, text_type):
95 val = val.encode(encoding)
96 req.environ[key] = val
97 else:
98 def fget(req):
99 encoding = req.url_encoding
100 return req.environ.get(key, '').encode('latin-1').decode(encoding)
101 def fset(req, val):
102 encoding = req.url_encoding
103 req.environ[key] = val.encode(encoding).decode('latin-1')
105 return property(fget, fset, doc='upath_property(%r)' % key)
108def deprecated_property(attr, name, text, version): # pragma: no cover
109 """
110 Wraps a descriptor, with a deprecation warning or error
111 """
112 def warn():
113 warn_deprecation('The attribute %s is deprecated: %s'
114 % (name, text),
115 version,
116 3
117 )
118 def fget(self):
119 warn()
120 return attr.__get__(self, type(self))
121 def fset(self, val):
122 warn()
123 attr.__set__(self, val)
124 def fdel(self):
125 warn()
126 attr.__delete__(self)
127 return property(fget, fset, fdel,
128 '<Deprecated attribute %s>' % name
129 )
132def header_getter(header, rfc_section):
133 doc = header_docstring(header, rfc_section)
134 key = header.lower()
136 def fget(r):
137 for k, v in r._headerlist:
138 if k.lower() == key:
139 return v
141 def fset(r, value):
142 fdel(r)
143 if value is not None:
144 if '\n' in value or '\r' in value:
145 raise ValueError('Header value may not contain control characters')
147 if isinstance(value, text_type) and PY2:
148 value = value.encode('latin-1')
149 r._headerlist.append((header, value))
151 def fdel(r):
152 r._headerlist[:] = [(k, v) for (k, v) in r._headerlist if k.lower() != key]
154 return property(fget, fset, fdel, doc)
159def converter(prop, parse, serialize, convert_name=None):
160 assert isinstance(prop, property)
161 convert_name = convert_name or "``%s`` and ``%s``" % (parse.__name__,
162 serialize.__name__)
163 doc = prop.__doc__ or ''
164 doc += " Converts it using %s." % convert_name
165 hget, hset = prop.fget, prop.fset
166 def fget(r):
167 return parse(hget(r))
168 def fset(r, val):
169 if val is not None:
170 val = serialize(val)
171 hset(r, val)
172 return property(fget, fset, prop.fdel, doc)
176def list_header(header, rfc_section):
177 prop = header_getter(header, rfc_section)
178 return converter(prop, parse_list, serialize_list, 'list')
180def parse_list(value):
181 if not value:
182 return None
183 return tuple(filter(None, [v.strip() for v in value.split(',')]))
185def serialize_list(value):
186 if isinstance(value, (text_type, bytes)):
187 return str(value)
188 else:
189 return ', '.join(map(str, value))
194def converter_date(prop):
195 return converter(prop, parse_date, serialize_date, 'HTTP date')
197def date_header(header, rfc_section):
198 return converter_date(header_getter(header, rfc_section))
208########################
209## Converter functions
210########################
213_rx_etag = re.compile(r'(?:^|\s)(W/)?"((?:\\"|.)*?)"')
215def parse_etag_response(value, strong=False):
216 """
217 Parse a response ETag.
218 See:
219 * http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.19
220 * http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.11
221 """
222 if not value:
223 return None
224 m = _rx_etag.match(value)
225 if not m:
226 # this etag is invalid, but we'll just return it anyway
227 return value
228 elif strong and m.group(1):
229 # this is a weak etag and we want only strong ones
230 return None
231 else:
232 return m.group(2).replace('\\"', '"')
234def serialize_etag_response(value): #return '"%s"' % value.replace('"', '\\"')
235 strong = True
236 if isinstance(value, tuple):
237 value, strong = value
238 elif _rx_etag.match(value):
239 # this is a valid etag already
240 return value
241 # let's quote the value
242 r = '"%s"' % value.replace('"', '\\"')
243 if not strong:
244 r = 'W/' + r
245 return r
247def serialize_if_range(value):
248 if isinstance(value, (datetime, date)):
249 return serialize_date(value)
250 value = str(value)
251 return value or None
253def parse_range(value):
254 if not value:
255 return None
256 # Might return None too:
257 return Range.parse(value)
259def serialize_range(value):
260 if not value:
261 return None
262 elif isinstance(value, (list, tuple)):
263 return str(Range(*value))
264 else:
265 assert isinstance(value, str)
266 return value
268def parse_int(value):
269 if value is None or value == '':
270 return None
271 return int(value)
273def parse_int_safe(value):
274 if value is None or value == '':
275 return None
276 try:
277 return int(value)
278 except ValueError:
279 return None
281serialize_int = str
283def parse_content_range(value):
284 if not value or not value.strip():
285 return None
286 # May still return None
287 return ContentRange.parse(value)
289def serialize_content_range(value):
290 if isinstance(value, (tuple, list)):
291 if len(value) not in (2, 3):
292 raise ValueError(
293 "When setting content_range to a list/tuple, it must "
294 "be length 2 or 3 (not %r)" % value)
295 if len(value) == 2:
296 begin, end = value
297 length = None
298 else:
299 begin, end, length = value
300 value = ContentRange(begin, end, length)
301 value = str(value).strip()
302 if not value:
303 return None
304 return value
309_rx_auth_param = re.compile(r'([a-z]+)[ \t]*=[ \t]*(".*?"|[^,]*?)[ \t]*(?:\Z|, *)')
311def parse_auth_params(params):
312 r = {}
313 for k, v in _rx_auth_param.findall(params):
314 r[k] = v.strip('"')
315 return r
317# see http://lists.w3.org/Archives/Public/ietf-http-wg/2009OctDec/0297.html
318known_auth_schemes = ['Basic', 'Digest', 'WSSE', 'HMACDigest', 'GoogleLogin',
319 'Cookie', 'OpenID']
320known_auth_schemes = dict.fromkeys(known_auth_schemes, None)
322_authorization = namedtuple('Authorization', ['authtype', 'params'])
324def parse_auth(val):
325 if val is not None:
326 authtype, sep, params = val.partition(' ')
327 if authtype in known_auth_schemes:
328 if authtype == 'Basic' and '"' not in params:
329 # this is the "Authentication: Basic XXXXX==" case
330 pass
331 else:
332 params = parse_auth_params(params)
333 return _authorization(authtype, params)
334 return val
336def serialize_auth(val):
337 if isinstance(val, (tuple, list)):
338 authtype, params = val
339 if isinstance(params, dict):
340 params = ', '.join(map('%s="%s"'.__mod__, params.items()))
341 assert isinstance(params, str)
342 return '%s %s' % (authtype, params)
343 return val