Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/sqlalchemy/util/compat.py : 44%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# util/compat.py
2# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
8"""Handle Python version/platform incompatibilities."""
10import collections
11import contextlib
12import inspect
13import operator
14import sys
17py36 = sys.version_info >= (3, 6)
18py33 = sys.version_info >= (3, 3)
19py35 = sys.version_info >= (3, 5)
20py32 = sys.version_info >= (3, 2)
21py3k = sys.version_info >= (3, 0)
22py2k = sys.version_info < (3, 0)
23py265 = sys.version_info >= (2, 6, 5)
24jython = sys.platform.startswith("java")
25pypy = hasattr(sys, "pypy_version_info")
27win32 = sys.platform.startswith("win")
28cpython = not pypy and not jython # TODO: something better for this ?
30contextmanager = contextlib.contextmanager
31dottedgetter = operator.attrgetter
32namedtuple = collections.namedtuple
33next = next # noqa
35FullArgSpec = collections.namedtuple(
36 "FullArgSpec",
37 [
38 "args",
39 "varargs",
40 "varkw",
41 "defaults",
42 "kwonlyargs",
43 "kwonlydefaults",
44 "annotations",
45 ],
46)
48try:
49 import threading
50except ImportError:
51 import dummy_threading as threading # noqa
54# work around http://bugs.python.org/issue2646
55if py265:
56 safe_kwarg = lambda arg: arg # noqa
57else:
58 safe_kwarg = str
61def inspect_getfullargspec(func):
62 """Fully vendored version of getfullargspec from Python 3.3."""
64 if inspect.ismethod(func):
65 func = func.__func__
66 if not inspect.isfunction(func):
67 raise TypeError("{!r} is not a Python function".format(func))
69 co = func.__code__
70 if not inspect.iscode(co):
71 raise TypeError("{!r} is not a code object".format(co))
73 nargs = co.co_argcount
74 names = co.co_varnames
75 nkwargs = co.co_kwonlyargcount if py3k else 0
76 args = list(names[:nargs])
77 kwonlyargs = list(names[nargs : nargs + nkwargs])
79 nargs += nkwargs
80 varargs = None
81 if co.co_flags & inspect.CO_VARARGS:
82 varargs = co.co_varnames[nargs]
83 nargs = nargs + 1
84 varkw = None
85 if co.co_flags & inspect.CO_VARKEYWORDS:
86 varkw = co.co_varnames[nargs]
88 return FullArgSpec(
89 args,
90 varargs,
91 varkw,
92 func.__defaults__,
93 kwonlyargs,
94 func.__kwdefaults__ if py3k else None,
95 func.__annotations__ if py3k else {},
96 )
99if py3k:
100 import base64
101 import builtins
102 import configparser
103 import itertools
104 import pickle
106 from functools import reduce
107 from io import BytesIO as byte_buffer
108 from io import StringIO
109 from itertools import zip_longest
110 from urllib.parse import (
111 quote_plus,
112 unquote_plus,
113 parse_qsl,
114 quote,
115 unquote,
116 )
118 string_types = (str,)
119 binary_types = (bytes,)
120 binary_type = bytes
121 text_type = str
122 int_types = (int,)
123 iterbytes = iter
125 itertools_filterfalse = itertools.filterfalse
126 itertools_filter = filter
127 itertools_imap = map
129 exec_ = getattr(builtins, "exec")
130 import_ = getattr(builtins, "__import__")
131 print_ = getattr(builtins, "print")
133 def b(s):
134 return s.encode("latin-1")
136 def b64decode(x):
137 return base64.b64decode(x.encode("ascii"))
139 def b64encode(x):
140 return base64.b64encode(x).decode("ascii")
142 def decode_backslashreplace(text, encoding):
143 return text.decode(encoding, errors="backslashreplace")
145 def cmp(a, b):
146 return (a > b) - (a < b)
148 def raise_(
149 exception, with_traceback=None, replace_context=None, from_=False
150 ):
151 r"""implement "raise" with cause support.
153 :param exception: exception to raise
154 :param with_traceback: will call exception.with_traceback()
155 :param replace_context: an as-yet-unsupported feature. This is
156 an exception object which we are "replacing", e.g., it's our
157 "cause" but we don't want it printed. Basically just what
158 ``__suppress_context__`` does but we don't want to suppress
159 the enclosing context, if any. So for now we make it the
160 cause.
161 :param from\_: the cause. this actually sets the cause and doesn't
162 hope to hide it someday.
164 """
165 if with_traceback is not None:
166 exception = exception.with_traceback(with_traceback)
168 if from_ is not False:
169 exception.__cause__ = from_
170 elif replace_context is not None:
171 # no good solution here, we would like to have the exception
172 # have only the context of replace_context.__context__ so that the
173 # intermediary exception does not change, but we can't figure
174 # that out.
175 exception.__cause__ = replace_context
177 try:
178 raise exception
179 finally:
180 # credit to
181 # https://cosmicpercolator.com/2016/01/13/exception-leaks-in-python-2-and-3/
182 # as the __traceback__ object creates a cycle
183 del exception, replace_context, from_, with_traceback
185 def u(s):
186 return s
188 def ue(s):
189 return s
191 if py32:
192 callable = callable # noqa
193 else:
195 def callable(fn): # noqa
196 return hasattr(fn, "__call__")
199else:
200 import base64
201 import ConfigParser as configparser # noqa
202 import itertools
204 from StringIO import StringIO # noqa
205 from cStringIO import StringIO as byte_buffer # noqa
206 from itertools import izip_longest as zip_longest # noqa
207 from urllib import quote # noqa
208 from urllib import quote_plus # noqa
209 from urllib import unquote # noqa
210 from urllib import unquote_plus # noqa
211 from urlparse import parse_qsl # noqa
213 try:
214 import cPickle as pickle
215 except ImportError:
216 import pickle # noqa
218 string_types = (basestring,) # noqa
219 binary_types = (bytes,)
220 binary_type = str
221 text_type = unicode # noqa
222 int_types = int, long # noqa
224 callable = callable # noqa
225 cmp = cmp # noqa
226 reduce = reduce # noqa
228 b64encode = base64.b64encode
229 b64decode = base64.b64decode
231 itertools_filterfalse = itertools.ifilterfalse
232 itertools_filter = itertools.ifilter
233 itertools_imap = itertools.imap
235 def b(s):
236 return s
238 def exec_(func_text, globals_, lcl=None):
239 if lcl is None:
240 exec("exec func_text in globals_")
241 else:
242 exec("exec func_text in globals_, lcl")
244 def iterbytes(buf):
245 return (ord(byte) for byte in buf)
247 def import_(*args):
248 if len(args) == 4:
249 args = args[0:3] + ([str(arg) for arg in args[3]],)
250 return __import__(*args)
252 def print_(*args, **kwargs):
253 fp = kwargs.pop("file", sys.stdout)
254 if fp is None:
255 return
256 for arg in enumerate(args):
257 if not isinstance(arg, basestring): # noqa
258 arg = str(arg)
259 fp.write(arg)
261 def u(s):
262 # this differs from what six does, which doesn't support non-ASCII
263 # strings - we only use u() with
264 # literal source strings, and all our source files with non-ascii
265 # in them (all are tests) are utf-8 encoded.
266 return unicode(s, "utf-8") # noqa
268 def ue(s):
269 return unicode(s, "unicode_escape") # noqa
271 def decode_backslashreplace(text, encoding):
272 try:
273 return text.decode(encoding)
274 except UnicodeDecodeError:
275 # regular "backslashreplace" for an incompatible encoding raises:
276 # "TypeError: don't know how to handle UnicodeDecodeError in
277 # error callback"
278 return repr(text)[1:-1].decode()
280 def safe_bytestring(text):
281 # py2k only
282 if not isinstance(text, string_types):
283 return unicode(text).encode("ascii", errors="backslashreplace")
284 elif isinstance(text, unicode):
285 return text.encode("ascii", errors="backslashreplace")
286 else:
287 return text
289 exec(
290 "def raise_(exception, with_traceback=None, replace_context=None, "
291 "from_=False):\n"
292 " if with_traceback:\n"
293 " raise type(exception), exception, with_traceback\n"
294 " else:\n"
295 " raise exception\n"
296 )
299if py35:
301 def _formatannotation(annotation, base_module=None):
302 """vendored from python 3.7
303 """
305 if getattr(annotation, "__module__", None) == "typing":
306 return repr(annotation).replace("typing.", "")
307 if isinstance(annotation, type):
308 if annotation.__module__ in ("builtins", base_module):
309 return annotation.__qualname__
310 return annotation.__module__ + "." + annotation.__qualname__
311 return repr(annotation)
313 def inspect_formatargspec(
314 args,
315 varargs=None,
316 varkw=None,
317 defaults=None,
318 kwonlyargs=(),
319 kwonlydefaults={},
320 annotations={},
321 formatarg=str,
322 formatvarargs=lambda name: "*" + name,
323 formatvarkw=lambda name: "**" + name,
324 formatvalue=lambda value: "=" + repr(value),
325 formatreturns=lambda text: " -> " + text,
326 formatannotation=_formatannotation,
327 ):
328 """Copy formatargspec from python 3.7 standard library.
330 Python 3 has deprecated formatargspec and requested that Signature
331 be used instead, however this requires a full reimplementation
332 of formatargspec() in terms of creating Parameter objects and such.
333 Instead of introducing all the object-creation overhead and having
334 to reinvent from scratch, just copy their compatibility routine.
336 Utimately we would need to rewrite our "decorator" routine completely
337 which is not really worth it right now, until all Python 2.x support
338 is dropped.
340 """
342 def formatargandannotation(arg):
343 result = formatarg(arg)
344 if arg in annotations:
345 result += ": " + formatannotation(annotations[arg])
346 return result
348 specs = []
349 if defaults:
350 firstdefault = len(args) - len(defaults)
351 for i, arg in enumerate(args):
352 spec = formatargandannotation(arg)
353 if defaults and i >= firstdefault:
354 spec = spec + formatvalue(defaults[i - firstdefault])
355 specs.append(spec)
357 if varargs is not None:
358 specs.append(formatvarargs(formatargandannotation(varargs)))
359 else:
360 if kwonlyargs:
361 specs.append("*")
363 if kwonlyargs:
364 for kwonlyarg in kwonlyargs:
365 spec = formatargandannotation(kwonlyarg)
366 if kwonlydefaults and kwonlyarg in kwonlydefaults:
367 spec += formatvalue(kwonlydefaults[kwonlyarg])
368 specs.append(spec)
370 if varkw is not None:
371 specs.append(formatvarkw(formatargandannotation(varkw)))
373 result = "(" + ", ".join(specs) + ")"
374 if "return" in annotations:
375 result += formatreturns(formatannotation(annotations["return"]))
376 return result
379elif py2k:
380 from inspect import formatargspec as _inspect_formatargspec
382 def inspect_formatargspec(*spec, **kw):
383 # convert for a potential FullArgSpec from compat.getfullargspec()
384 return _inspect_formatargspec(*spec[0:4], **kw) # noqa
387else:
388 from inspect import formatargspec as inspect_formatargspec # noqa
391# Fix deprecation of accessing ABCs straight from collections module
392# (which will stop working in 3.8).
393if py33:
394 import collections.abc as collections_abc
395else:
396 import collections as collections_abc # noqa
399@contextlib.contextmanager
400def nested(*managers):
401 """Implement contextlib.nested, mostly for unit tests.
403 As tests still need to run on py2.6 we can't use multiple-with yet.
405 Function is removed in py3k but also emits deprecation warning in 2.7
406 so just roll it here for everyone.
408 """
410 exits = []
411 vars_ = []
412 exc = (None, None, None)
413 try:
414 for mgr in managers:
415 exit_ = mgr.__exit__
416 enter = mgr.__enter__
417 vars_.append(enter())
418 exits.append(exit_)
419 yield vars_
420 except:
421 exc = sys.exc_info()
422 finally:
423 while exits:
424 exit_ = exits.pop() # noqa
425 try:
426 if exit_(*exc):
427 exc = (None, None, None)
428 except:
429 exc = sys.exc_info()
430 if exc != (None, None, None):
431 reraise(exc[0], exc[1], exc[2])
434def raise_from_cause(exception, exc_info=None):
435 r"""legacy. use raise\_()"""
437 if exc_info is None:
438 exc_info = sys.exc_info()
439 exc_type, exc_value, exc_tb = exc_info
440 cause = exc_value if exc_value is not exception else None
441 reraise(type(exception), exception, tb=exc_tb, cause=cause)
444def reraise(tp, value, tb=None, cause=None):
445 r"""legacy. use raise\_()"""
447 raise_(value, with_traceback=tb, from_=cause)
450def with_metaclass(meta, *bases):
451 """Create a base class with a metaclass.
453 Drops the middle class upon creation.
455 Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/
457 """
459 class metaclass(meta):
460 __call__ = type.__call__
461 __init__ = type.__init__
463 def __new__(cls, name, this_bases, d):
464 if this_bases is None:
465 return type.__new__(cls, name, (), d)
466 return meta(name, bases, d)
468 return metaclass("temporary_class", None, {})
471if py3k:
472 from datetime import timezone
473else:
474 from datetime import datetime
475 from datetime import timedelta
476 from datetime import tzinfo
478 class timezone(tzinfo):
479 """Minimal port of python 3 timezone object"""
481 __slots__ = "_offset"
483 def __init__(self, offset):
484 if not isinstance(offset, timedelta):
485 raise TypeError("offset must be a timedelta")
486 if not self._minoffset <= offset <= self._maxoffset:
487 raise ValueError(
488 "offset must be a timedelta "
489 "strictly between -timedelta(hours=24) and "
490 "timedelta(hours=24)."
491 )
492 self._offset = offset
494 def __eq__(self, other):
495 if type(other) != timezone:
496 return False
497 return self._offset == other._offset
499 def __hash__(self):
500 return hash(self._offset)
502 def __repr__(self):
503 return "sqlalchemy.util.%s(%r)" % (
504 self.__class__.__name__,
505 self._offset,
506 )
508 def __str__(self):
509 return self.tzname(None)
511 def utcoffset(self, dt):
512 return self._offset
514 def tzname(self, dt):
515 return self._name_from_offset(self._offset)
517 def dst(self, dt):
518 return None
520 def fromutc(self, dt):
521 if isinstance(dt, datetime):
522 if dt.tzinfo is not self:
523 raise ValueError("fromutc: dt.tzinfo " "is not self")
524 return dt + self._offset
525 raise TypeError(
526 "fromutc() argument must be a datetime instance" " or None"
527 )
529 @staticmethod
530 def _timedelta_to_microseconds(timedelta):
531 """backport of timedelta._to_microseconds()"""
532 return (
533 timedelta.days * (24 * 3600) + timedelta.seconds
534 ) * 1000000 + timedelta.microseconds
536 @staticmethod
537 def _divmod_timedeltas(a, b):
538 """backport of timedelta.__divmod__"""
540 q, r = divmod(
541 timezone._timedelta_to_microseconds(a),
542 timezone._timedelta_to_microseconds(b),
543 )
544 return q, timedelta(0, 0, r)
546 @staticmethod
547 def _name_from_offset(delta):
548 if not delta:
549 return "UTC"
550 if delta < timedelta(0):
551 sign = "-"
552 delta = -delta
553 else:
554 sign = "+"
555 hours, rest = timezone._divmod_timedeltas(
556 delta, timedelta(hours=1)
557 )
558 minutes, rest = timezone._divmod_timedeltas(
559 rest, timedelta(minutes=1)
560 )
561 result = "UTC%s%02d:%02d" % (sign, hours, minutes)
562 if rest.seconds:
563 result += ":%02d" % (rest.seconds,)
564 if rest.microseconds:
565 result += ".%06d" % (rest.microseconds,)
566 return result
568 _maxoffset = timedelta(hours=23, minutes=59)
569 _minoffset = -_maxoffset
571 timezone.utc = timezone(timedelta(0))