Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import collections 

2import textwrap 

3import uuid 

4import warnings 

5 

6from .compat import callable 

7from .compat import collections_abc 

8from .compat import exec_ 

9from .compat import inspect_getargspec 

10from .compat import string_types 

11from .compat import with_metaclass 

12 

13 

14class _ModuleClsMeta(type): 

15 def __setattr__(cls, key, value): 

16 super(_ModuleClsMeta, cls).__setattr__(key, value) 

17 cls._update_module_proxies(key) 

18 

19 

20class ModuleClsProxy(with_metaclass(_ModuleClsMeta)): 

21 """Create module level proxy functions for the 

22 methods on a given class. 

23 

24 The functions will have a compatible signature 

25 as the methods. 

26 

27 """ 

28 

29 _setups = collections.defaultdict(lambda: (set(), [])) 

30 

31 @classmethod 

32 def _update_module_proxies(cls, name): 

33 attr_names, modules = cls._setups[cls] 

34 for globals_, locals_ in modules: 

35 cls._add_proxied_attribute(name, globals_, locals_, attr_names) 

36 

37 def _install_proxy(self): 

38 attr_names, modules = self._setups[self.__class__] 

39 for globals_, locals_ in modules: 

40 globals_["_proxy"] = self 

41 for attr_name in attr_names: 

42 globals_[attr_name] = getattr(self, attr_name) 

43 

44 def _remove_proxy(self): 

45 attr_names, modules = self._setups[self.__class__] 

46 for globals_, locals_ in modules: 

47 globals_["_proxy"] = None 

48 for attr_name in attr_names: 

49 del globals_[attr_name] 

50 

51 @classmethod 

52 def create_module_class_proxy(cls, globals_, locals_): 

53 attr_names, modules = cls._setups[cls] 

54 modules.append((globals_, locals_)) 

55 cls._setup_proxy(globals_, locals_, attr_names) 

56 

57 @classmethod 

58 def _setup_proxy(cls, globals_, locals_, attr_names): 

59 for methname in dir(cls): 

60 cls._add_proxied_attribute(methname, globals_, locals_, attr_names) 

61 

62 @classmethod 

63 def _add_proxied_attribute(cls, methname, globals_, locals_, attr_names): 

64 if not methname.startswith("_"): 

65 meth = getattr(cls, methname) 

66 if callable(meth): 

67 locals_[methname] = cls._create_method_proxy( 

68 methname, globals_, locals_ 

69 ) 

70 else: 

71 attr_names.add(methname) 

72 

73 @classmethod 

74 def _create_method_proxy(cls, name, globals_, locals_): 

75 fn = getattr(cls, name) 

76 

77 def _name_error(name): 

78 raise NameError( 

79 "Can't invoke function '%s', as the proxy object has " 

80 "not yet been " 

81 "established for the Alembic '%s' class. " 

82 "Try placing this code inside a callable." 

83 % (name, cls.__name__) 

84 ) 

85 

86 globals_["_name_error"] = _name_error 

87 

88 translations = getattr(fn, "_legacy_translations", []) 

89 if translations: 

90 spec = inspect_getargspec(fn) 

91 if spec[0] and spec[0][0] == "self": 

92 spec[0].pop(0) 

93 

94 outer_args = inner_args = "*args, **kw" 

95 translate_str = "args, kw = _translate(%r, %r, %r, args, kw)" % ( 

96 fn.__name__, 

97 tuple(spec), 

98 translations, 

99 ) 

100 

101 def translate(fn_name, spec, translations, args, kw): 

102 return_kw = {} 

103 return_args = [] 

104 

105 for oldname, newname in translations: 

106 if oldname in kw: 

107 warnings.warn( 

108 "Argument %r is now named %r " 

109 "for method %s()." % (oldname, newname, fn_name) 

110 ) 

111 return_kw[newname] = kw.pop(oldname) 

112 return_kw.update(kw) 

113 

114 args = list(args) 

115 if spec[3]: 

116 pos_only = spec[0][: -len(spec[3])] 

117 else: 

118 pos_only = spec[0] 

119 for arg in pos_only: 

120 if arg not in return_kw: 

121 try: 

122 return_args.append(args.pop(0)) 

123 except IndexError: 

124 raise TypeError( 

125 "missing required positional argument: %s" 

126 % arg 

127 ) 

128 return_args.extend(args) 

129 

130 return return_args, return_kw 

131 

132 globals_["_translate"] = translate 

133 else: 

134 outer_args = "*args, **kw" 

135 inner_args = "*args, **kw" 

136 translate_str = "" 

137 

138 func_text = textwrap.dedent( 

139 """\ 

140 def %(name)s(%(args)s): 

141 %(doc)r 

142 %(translate)s 

143 try: 

144 p = _proxy 

145 except NameError: 

146 _name_error('%(name)s') 

147 return _proxy.%(name)s(%(apply_kw)s) 

148 e 

149 """ 

150 % { 

151 "name": name, 

152 "translate": translate_str, 

153 "args": outer_args, 

154 "apply_kw": inner_args, 

155 "doc": fn.__doc__, 

156 } 

157 ) 

158 lcl = {} 

159 exec_(func_text, globals_, lcl) 

160 return lcl[name] 

161 

162 

163def _with_legacy_names(translations): 

164 def decorate(fn): 

165 fn._legacy_translations = translations 

166 return fn 

167 

168 return decorate 

169 

170 

171def asbool(value): 

172 return value is not None and value.lower() == "true" 

173 

174 

175def rev_id(): 

176 return uuid.uuid4().hex[-12:] 

177 

178 

179def to_list(x, default=None): 

180 if x is None: 

181 return default 

182 elif isinstance(x, string_types): 

183 return [x] 

184 elif isinstance(x, collections_abc.Iterable): 

185 return list(x) 

186 else: 

187 return [x] 

188 

189 

190def to_tuple(x, default=None): 

191 if x is None: 

192 return default 

193 elif isinstance(x, string_types): 

194 return (x,) 

195 elif isinstance(x, collections_abc.Iterable): 

196 return tuple(x) 

197 else: 

198 return (x,) 

199 

200 

201def unique_list(seq, hashfunc=None): 

202 seen = set() 

203 seen_add = seen.add 

204 if not hashfunc: 

205 return [x for x in seq if x not in seen and not seen_add(x)] 

206 else: 

207 return [ 

208 x 

209 for x in seq 

210 if hashfunc(x) not in seen and not seen_add(hashfunc(x)) 

211 ] 

212 

213 

214def dedupe_tuple(tup): 

215 return tuple(unique_list(tup)) 

216 

217 

218class memoized_property(object): 

219 

220 """A read-only @property that is only evaluated once.""" 

221 

222 def __init__(self, fget, doc=None): 

223 self.fget = fget 

224 self.__doc__ = doc or fget.__doc__ 

225 self.__name__ = fget.__name__ 

226 

227 def __get__(self, obj, cls): 

228 if obj is None: 

229 return self 

230 obj.__dict__[self.__name__] = result = self.fget(obj) 

231 return result 

232 

233 

234class immutabledict(dict): 

235 def _immutable(self, *arg, **kw): 

236 raise TypeError("%s object is immutable" % self.__class__.__name__) 

237 

238 __delitem__ = ( 

239 __setitem__ 

240 ) = __setattr__ = clear = pop = popitem = setdefault = update = _immutable 

241 

242 def __new__(cls, *args): 

243 new = dict.__new__(cls) 

244 dict.__init__(new, *args) 

245 return new 

246 

247 def __init__(self, *args): 

248 pass 

249 

250 def __reduce__(self): 

251 return immutabledict, (dict(self),) 

252 

253 def union(self, d): 

254 if not self: 

255 return immutabledict(d) 

256 else: 

257 d2 = immutabledict(self) 

258 dict.update(d2, d) 

259 return d2 

260 

261 def __repr__(self): 

262 return "immutabledict(%s)" % dict.__repr__(self) 

263 

264 

265class Dispatcher(object): 

266 def __init__(self, uselist=False): 

267 self._registry = {} 

268 self.uselist = uselist 

269 

270 def dispatch_for(self, target, qualifier="default"): 

271 def decorate(fn): 

272 if self.uselist: 

273 self._registry.setdefault((target, qualifier), []).append(fn) 

274 else: 

275 assert (target, qualifier) not in self._registry 

276 self._registry[(target, qualifier)] = fn 

277 return fn 

278 

279 return decorate 

280 

281 def dispatch(self, obj, qualifier="default"): 

282 

283 if isinstance(obj, string_types): 

284 targets = [obj] 

285 elif isinstance(obj, type): 

286 targets = obj.__mro__ 

287 else: 

288 targets = type(obj).__mro__ 

289 

290 for spcls in targets: 

291 if qualifier != "default" and (spcls, qualifier) in self._registry: 

292 return self._fn_or_list(self._registry[(spcls, qualifier)]) 

293 elif (spcls, "default") in self._registry: 

294 return self._fn_or_list(self._registry[(spcls, "default")]) 

295 else: 

296 raise ValueError("no dispatch function for object: %s" % obj) 

297 

298 def _fn_or_list(self, fn_or_list): 

299 if self.uselist: 

300 

301 def go(*arg, **kw): 

302 for fn in fn_or_list: 

303 fn(*arg, **kw) 

304 

305 return go 

306 else: 

307 return fn_or_list 

308 

309 def branch(self): 

310 """Return a copy of this dispatcher that is independently 

311 writable.""" 

312 

313 d = Dispatcher() 

314 if self.uselist: 

315 d._registry.update( 

316 (k, [fn for fn in self._registry[k]]) for k in self._registry 

317 ) 

318 else: 

319 d._registry.update(self._registry) 

320 return d