Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/alembic/util/langhelpers.py : 55%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import collections
2import textwrap
3import uuid
4import warnings
6from .compat import callable
7from .compat import collections_abc
8from .compat import exec_
9from .compat import inspect_getargspec
10from .compat import string_types
11from .compat import with_metaclass
14class _ModuleClsMeta(type):
15 def __setattr__(cls, key, value):
16 super(_ModuleClsMeta, cls).__setattr__(key, value)
17 cls._update_module_proxies(key)
20class ModuleClsProxy(with_metaclass(_ModuleClsMeta)):
21 """Create module level proxy functions for the
22 methods on a given class.
24 The functions will have a compatible signature
25 as the methods.
27 """
29 _setups = collections.defaultdict(lambda: (set(), []))
31 @classmethod
32 def _update_module_proxies(cls, name):
33 attr_names, modules = cls._setups[cls]
34 for globals_, locals_ in modules:
35 cls._add_proxied_attribute(name, globals_, locals_, attr_names)
37 def _install_proxy(self):
38 attr_names, modules = self._setups[self.__class__]
39 for globals_, locals_ in modules:
40 globals_["_proxy"] = self
41 for attr_name in attr_names:
42 globals_[attr_name] = getattr(self, attr_name)
44 def _remove_proxy(self):
45 attr_names, modules = self._setups[self.__class__]
46 for globals_, locals_ in modules:
47 globals_["_proxy"] = None
48 for attr_name in attr_names:
49 del globals_[attr_name]
51 @classmethod
52 def create_module_class_proxy(cls, globals_, locals_):
53 attr_names, modules = cls._setups[cls]
54 modules.append((globals_, locals_))
55 cls._setup_proxy(globals_, locals_, attr_names)
57 @classmethod
58 def _setup_proxy(cls, globals_, locals_, attr_names):
59 for methname in dir(cls):
60 cls._add_proxied_attribute(methname, globals_, locals_, attr_names)
62 @classmethod
63 def _add_proxied_attribute(cls, methname, globals_, locals_, attr_names):
64 if not methname.startswith("_"):
65 meth = getattr(cls, methname)
66 if callable(meth):
67 locals_[methname] = cls._create_method_proxy(
68 methname, globals_, locals_
69 )
70 else:
71 attr_names.add(methname)
73 @classmethod
74 def _create_method_proxy(cls, name, globals_, locals_):
75 fn = getattr(cls, name)
77 def _name_error(name):
78 raise NameError(
79 "Can't invoke function '%s', as the proxy object has "
80 "not yet been "
81 "established for the Alembic '%s' class. "
82 "Try placing this code inside a callable."
83 % (name, cls.__name__)
84 )
86 globals_["_name_error"] = _name_error
88 translations = getattr(fn, "_legacy_translations", [])
89 if translations:
90 spec = inspect_getargspec(fn)
91 if spec[0] and spec[0][0] == "self":
92 spec[0].pop(0)
94 outer_args = inner_args = "*args, **kw"
95 translate_str = "args, kw = _translate(%r, %r, %r, args, kw)" % (
96 fn.__name__,
97 tuple(spec),
98 translations,
99 )
101 def translate(fn_name, spec, translations, args, kw):
102 return_kw = {}
103 return_args = []
105 for oldname, newname in translations:
106 if oldname in kw:
107 warnings.warn(
108 "Argument %r is now named %r "
109 "for method %s()." % (oldname, newname, fn_name)
110 )
111 return_kw[newname] = kw.pop(oldname)
112 return_kw.update(kw)
114 args = list(args)
115 if spec[3]:
116 pos_only = spec[0][: -len(spec[3])]
117 else:
118 pos_only = spec[0]
119 for arg in pos_only:
120 if arg not in return_kw:
121 try:
122 return_args.append(args.pop(0))
123 except IndexError:
124 raise TypeError(
125 "missing required positional argument: %s"
126 % arg
127 )
128 return_args.extend(args)
130 return return_args, return_kw
132 globals_["_translate"] = translate
133 else:
134 outer_args = "*args, **kw"
135 inner_args = "*args, **kw"
136 translate_str = ""
138 func_text = textwrap.dedent(
139 """\
140 def %(name)s(%(args)s):
141 %(doc)r
142 %(translate)s
143 try:
144 p = _proxy
145 except NameError:
146 _name_error('%(name)s')
147 return _proxy.%(name)s(%(apply_kw)s)
148 e
149 """
150 % {
151 "name": name,
152 "translate": translate_str,
153 "args": outer_args,
154 "apply_kw": inner_args,
155 "doc": fn.__doc__,
156 }
157 )
158 lcl = {}
159 exec_(func_text, globals_, lcl)
160 return lcl[name]
163def _with_legacy_names(translations):
164 def decorate(fn):
165 fn._legacy_translations = translations
166 return fn
168 return decorate
171def asbool(value):
172 return value is not None and value.lower() == "true"
175def rev_id():
176 return uuid.uuid4().hex[-12:]
179def to_list(x, default=None):
180 if x is None:
181 return default
182 elif isinstance(x, string_types):
183 return [x]
184 elif isinstance(x, collections_abc.Iterable):
185 return list(x)
186 else:
187 return [x]
190def to_tuple(x, default=None):
191 if x is None:
192 return default
193 elif isinstance(x, string_types):
194 return (x,)
195 elif isinstance(x, collections_abc.Iterable):
196 return tuple(x)
197 else:
198 return (x,)
201def unique_list(seq, hashfunc=None):
202 seen = set()
203 seen_add = seen.add
204 if not hashfunc:
205 return [x for x in seq if x not in seen and not seen_add(x)]
206 else:
207 return [
208 x
209 for x in seq
210 if hashfunc(x) not in seen and not seen_add(hashfunc(x))
211 ]
214def dedupe_tuple(tup):
215 return tuple(unique_list(tup))
218class memoized_property(object):
220 """A read-only @property that is only evaluated once."""
222 def __init__(self, fget, doc=None):
223 self.fget = fget
224 self.__doc__ = doc or fget.__doc__
225 self.__name__ = fget.__name__
227 def __get__(self, obj, cls):
228 if obj is None:
229 return self
230 obj.__dict__[self.__name__] = result = self.fget(obj)
231 return result
234class immutabledict(dict):
235 def _immutable(self, *arg, **kw):
236 raise TypeError("%s object is immutable" % self.__class__.__name__)
238 __delitem__ = (
239 __setitem__
240 ) = __setattr__ = clear = pop = popitem = setdefault = update = _immutable
242 def __new__(cls, *args):
243 new = dict.__new__(cls)
244 dict.__init__(new, *args)
245 return new
247 def __init__(self, *args):
248 pass
250 def __reduce__(self):
251 return immutabledict, (dict(self),)
253 def union(self, d):
254 if not self:
255 return immutabledict(d)
256 else:
257 d2 = immutabledict(self)
258 dict.update(d2, d)
259 return d2
261 def __repr__(self):
262 return "immutabledict(%s)" % dict.__repr__(self)
265class Dispatcher(object):
266 def __init__(self, uselist=False):
267 self._registry = {}
268 self.uselist = uselist
270 def dispatch_for(self, target, qualifier="default"):
271 def decorate(fn):
272 if self.uselist:
273 self._registry.setdefault((target, qualifier), []).append(fn)
274 else:
275 assert (target, qualifier) not in self._registry
276 self._registry[(target, qualifier)] = fn
277 return fn
279 return decorate
281 def dispatch(self, obj, qualifier="default"):
283 if isinstance(obj, string_types):
284 targets = [obj]
285 elif isinstance(obj, type):
286 targets = obj.__mro__
287 else:
288 targets = type(obj).__mro__
290 for spcls in targets:
291 if qualifier != "default" and (spcls, qualifier) in self._registry:
292 return self._fn_or_list(self._registry[(spcls, qualifier)])
293 elif (spcls, "default") in self._registry:
294 return self._fn_or_list(self._registry[(spcls, "default")])
295 else:
296 raise ValueError("no dispatch function for object: %s" % obj)
298 def _fn_or_list(self, fn_or_list):
299 if self.uselist:
301 def go(*arg, **kw):
302 for fn in fn_or_list:
303 fn(*arg, **kw)
305 return go
306 else:
307 return fn_or_list
309 def branch(self):
310 """Return a copy of this dispatcher that is independently
311 writable."""
313 d = Dispatcher()
314 if self.uselist:
315 d._registry.update(
316 (k, [fn for fn in self._registry[k]]) for k in self._registry
317 )
318 else:
319 d._registry.update(self._registry)
320 return d