Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/mako/util.py : 26%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# mako/util.py
2# Copyright 2006-2020 the Mako authors and contributors <see AUTHORS file>
3#
4# This module is part of Mako and is released under
5# the MIT License: http://www.opensource.org/licenses/mit-license.php
6from __future__ import absolute_import
8from ast import parse
9import codecs
10import collections
11import operator
12import os
13import re
14import timeit
16from mako import compat
19def update_wrapper(decorated, fn):
20 decorated.__wrapped__ = fn
21 decorated.__name__ = fn.__name__
22 return decorated
25class PluginLoader(object):
26 def __init__(self, group):
27 self.group = group
28 self.impls = {}
30 def load(self, name):
31 if name in self.impls:
32 return self.impls[name]()
33 else:
34 import pkg_resources
36 for impl in pkg_resources.iter_entry_points(self.group, name):
37 self.impls[name] = impl.load
38 return impl.load()
39 else:
40 from mako import exceptions
42 raise exceptions.RuntimeException(
43 "Can't load plugin %s %s" % (self.group, name)
44 )
46 def register(self, name, modulepath, objname):
47 def load():
48 mod = __import__(modulepath)
49 for token in modulepath.split(".")[1:]:
50 mod = getattr(mod, token)
51 return getattr(mod, objname)
53 self.impls[name] = load
56def verify_directory(dir_):
57 """create and/or verify a filesystem directory."""
59 tries = 0
61 while not os.path.exists(dir_):
62 try:
63 tries += 1
64 os.makedirs(dir_, compat.octal("0775"))
65 except:
66 if tries > 5:
67 raise
70def to_list(x, default=None):
71 if x is None:
72 return default
73 if not isinstance(x, (list, tuple)):
74 return [x]
75 else:
76 return x
79class memoized_property(object):
81 """A read-only @property that is only evaluated once."""
83 def __init__(self, fget, doc=None):
84 self.fget = fget
85 self.__doc__ = doc or fget.__doc__
86 self.__name__ = fget.__name__
88 def __get__(self, obj, cls):
89 if obj is None:
90 return self
91 obj.__dict__[self.__name__] = result = self.fget(obj)
92 return result
95class memoized_instancemethod(object):
97 """Decorate a method memoize its return value.
99 Best applied to no-arg methods: memoization is not sensitive to
100 argument values, and will always return the same value even when
101 called with different arguments.
103 """
105 def __init__(self, fget, doc=None):
106 self.fget = fget
107 self.__doc__ = doc or fget.__doc__
108 self.__name__ = fget.__name__
110 def __get__(self, obj, cls):
111 if obj is None:
112 return self
114 def oneshot(*args, **kw):
115 result = self.fget(obj, *args, **kw)
117 def memo(*a, **kw):
118 return result
120 memo.__name__ = self.__name__
121 memo.__doc__ = self.__doc__
122 obj.__dict__[self.__name__] = memo
123 return result
125 oneshot.__name__ = self.__name__
126 oneshot.__doc__ = self.__doc__
127 return oneshot
130class SetLikeDict(dict):
132 """a dictionary that has some setlike methods on it"""
134 def union(self, other):
135 """produce a 'union' of this dict and another (at the key level).
137 values in the second dict take precedence over that of the first"""
138 x = SetLikeDict(**self)
139 x.update(other)
140 return x
143class FastEncodingBuffer(object):
145 """a very rudimentary buffer that is faster than StringIO,
146 but doesn't crash on unicode data like cStringIO."""
148 def __init__(self, encoding=None, errors="strict", as_unicode=False):
149 self.data = collections.deque()
150 self.encoding = encoding
151 if as_unicode:
152 self.delim = compat.u("")
153 else:
154 self.delim = ""
155 self.as_unicode = as_unicode
156 self.errors = errors
157 self.write = self.data.append
159 def truncate(self):
160 self.data = collections.deque()
161 self.write = self.data.append
163 def getvalue(self):
164 if self.encoding:
165 return self.delim.join(self.data).encode(
166 self.encoding, self.errors
167 )
168 else:
169 return self.delim.join(self.data)
172class LRUCache(dict):
174 """A dictionary-like object that stores a limited number of items,
175 discarding lesser used items periodically.
177 this is a rewrite of LRUCache from Myghty to use a periodic timestamp-based
178 paradigm so that synchronization is not really needed. the size management
179 is inexact.
180 """
182 class _Item(object):
183 def __init__(self, key, value):
184 self.key = key
185 self.value = value
186 self.timestamp = timeit.default_timer()
188 def __repr__(self):
189 return repr(self.value)
191 def __init__(self, capacity, threshold=0.5):
192 self.capacity = capacity
193 self.threshold = threshold
195 def __getitem__(self, key):
196 item = dict.__getitem__(self, key)
197 item.timestamp = timeit.default_timer()
198 return item.value
200 def values(self):
201 return [i.value for i in dict.values(self)]
203 def setdefault(self, key, value):
204 if key in self:
205 return self[key]
206 else:
207 self[key] = value
208 return value
210 def __setitem__(self, key, value):
211 item = dict.get(self, key)
212 if item is None:
213 item = self._Item(key, value)
214 dict.__setitem__(self, key, item)
215 else:
216 item.value = value
217 self._manage_size()
219 def _manage_size(self):
220 while len(self) > self.capacity + self.capacity * self.threshold:
221 bytime = sorted(
222 dict.values(self),
223 key=operator.attrgetter("timestamp"),
224 reverse=True,
225 )
226 for item in bytime[self.capacity :]:
227 try:
228 del self[item.key]
229 except KeyError:
230 # if we couldn't find a key, most likely some other thread
231 # broke in on us. loop around and try again
232 break
235# Regexp to match python magic encoding line
236_PYTHON_MAGIC_COMMENT_re = re.compile(
237 r"[ \t\f]* \# .* coding[=:][ \t]*([-\w.]+)", re.VERBOSE
238)
241def parse_encoding(fp):
242 """Deduce the encoding of a Python source file (binary mode) from magic
243 comment.
245 It does this in the same way as the `Python interpreter`__
247 .. __: http://docs.python.org/ref/encodings.html
249 The ``fp`` argument should be a seekable file object in binary mode.
250 """
251 pos = fp.tell()
252 fp.seek(0)
253 try:
254 line1 = fp.readline()
255 has_bom = line1.startswith(codecs.BOM_UTF8)
256 if has_bom:
257 line1 = line1[len(codecs.BOM_UTF8) :]
259 m = _PYTHON_MAGIC_COMMENT_re.match(line1.decode("ascii", "ignore"))
260 if not m:
261 try:
262 parse(line1.decode("ascii", "ignore"))
263 except (ImportError, SyntaxError):
264 # Either it's a real syntax error, in which case the source
265 # is not valid python source, or line2 is a continuation of
266 # line1, in which case we don't want to scan line2 for a magic
267 # comment.
268 pass
269 else:
270 line2 = fp.readline()
271 m = _PYTHON_MAGIC_COMMENT_re.match(
272 line2.decode("ascii", "ignore")
273 )
275 if has_bom:
276 if m:
277 raise SyntaxError(
278 "python refuses to compile code with both a UTF8"
279 " byte-order-mark and a magic encoding comment"
280 )
281 return "utf_8"
282 elif m:
283 return m.group(1)
284 else:
285 return None
286 finally:
287 fp.seek(pos)
290def sorted_dict_repr(d):
291 """repr() a dictionary with the keys in order.
293 Used by the lexer unit test to compare parse trees based on strings.
295 """
296 keys = list(d.keys())
297 keys.sort()
298 return "{" + ", ".join(["%r: %r" % (k, d[k]) for k in keys]) + "}"
301def restore__ast(_ast):
302 """Attempt to restore the required classes to the _ast module if it
303 appears to be missing them
304 """
305 if hasattr(_ast, "AST"):
306 return
307 _ast.PyCF_ONLY_AST = 2 << 9
308 m = compile(
309 """\
310def foo(): pass
311class Bar(object): pass
312if False: pass
313baz = 'mako'
3141 + 2 - 3 * 4 / 5
3156 // 7 % 8 << 9 >> 10
31611 & 12 ^ 13 | 14
31715 and 16 or 17
318-baz + (not +18) - ~17
319baz and 'foo' or 'bar'
320(mako is baz == baz) is not baz != mako
321mako > baz < mako >= baz <= mako
322mako in baz not in mako""",
323 "<unknown>",
324 "exec",
325 _ast.PyCF_ONLY_AST,
326 )
327 _ast.Module = type(m)
329 for cls in _ast.Module.__mro__:
330 if cls.__name__ == "mod":
331 _ast.mod = cls
332 elif cls.__name__ == "AST":
333 _ast.AST = cls
335 _ast.FunctionDef = type(m.body[0])
336 _ast.ClassDef = type(m.body[1])
337 _ast.If = type(m.body[2])
339 _ast.Name = type(m.body[3].targets[0])
340 _ast.Store = type(m.body[3].targets[0].ctx)
341 _ast.Str = type(m.body[3].value)
343 _ast.Sub = type(m.body[4].value.op)
344 _ast.Add = type(m.body[4].value.left.op)
345 _ast.Div = type(m.body[4].value.right.op)
346 _ast.Mult = type(m.body[4].value.right.left.op)
348 _ast.RShift = type(m.body[5].value.op)
349 _ast.LShift = type(m.body[5].value.left.op)
350 _ast.Mod = type(m.body[5].value.left.left.op)
351 _ast.FloorDiv = type(m.body[5].value.left.left.left.op)
353 _ast.BitOr = type(m.body[6].value.op)
354 _ast.BitXor = type(m.body[6].value.left.op)
355 _ast.BitAnd = type(m.body[6].value.left.left.op)
357 _ast.Or = type(m.body[7].value.op)
358 _ast.And = type(m.body[7].value.values[0].op)
360 _ast.Invert = type(m.body[8].value.right.op)
361 _ast.Not = type(m.body[8].value.left.right.op)
362 _ast.UAdd = type(m.body[8].value.left.right.operand.op)
363 _ast.USub = type(m.body[8].value.left.left.op)
365 _ast.Or = type(m.body[9].value.op)
366 _ast.And = type(m.body[9].value.values[0].op)
368 _ast.IsNot = type(m.body[10].value.ops[0])
369 _ast.NotEq = type(m.body[10].value.ops[1])
370 _ast.Is = type(m.body[10].value.left.ops[0])
371 _ast.Eq = type(m.body[10].value.left.ops[1])
373 _ast.Gt = type(m.body[11].value.ops[0])
374 _ast.Lt = type(m.body[11].value.ops[1])
375 _ast.GtE = type(m.body[11].value.ops[2])
376 _ast.LtE = type(m.body[11].value.ops[3])
378 _ast.In = type(m.body[12].value.ops[0])
379 _ast.NotIn = type(m.body[12].value.ops[1])
382def read_file(path, mode="rb"):
383 fp = open(path, mode)
384 try:
385 data = fp.read()
386 return data
387 finally:
388 fp.close()
391def read_python_file(path):
392 fp = open(path, "rb")
393 try:
394 encoding = parse_encoding(fp)
395 data = fp.read()
396 if encoding:
397 data = data.decode(encoding)
398 return data
399 finally:
400 fp.close()