Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/sqlalchemy/engine/url.py : 48%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# engine/url.py
2# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
8"""Provides the :class:`~sqlalchemy.engine.url.URL` class which encapsulates
9information about a database connection specification.
11The URL object is created automatically when
12:func:`~sqlalchemy.engine.create_engine` is called with a string
13argument; alternatively, the URL is a public-facing construct which can
14be used directly and is also accepted directly by ``create_engine()``.
15"""
17import re
19from .interfaces import Dialect
20from .. import exc
21from .. import util
22from ..dialects import plugins
23from ..dialects import registry
26class URL(object):
27 """
28 Represent the components of a URL used to connect to a database.
30 This object is suitable to be passed directly to a
31 :func:`~sqlalchemy.create_engine` call. The fields of the URL are parsed
32 from a string by the :func:`.make_url` function. the string
33 format of the URL is an RFC-1738-style string.
35 All initialization parameters are available as public attributes.
37 :param drivername: the name of the database backend.
38 This name will correspond to a module in sqlalchemy/databases
39 or a third party plug-in.
41 :param username: The user name.
43 :param password: database password.
45 :param host: The name of the host.
47 :param port: The port number.
49 :param database: The database name.
51 :param query: A dictionary of options to be passed to the
52 dialect and/or the DBAPI upon connect.
54 """
56 def __init__(
57 self,
58 drivername,
59 username=None,
60 password=None,
61 host=None,
62 port=None,
63 database=None,
64 query=None,
65 ):
66 self.drivername = drivername
67 self.username = username
68 self.password_original = password
69 self.host = host
70 if port is not None:
71 self.port = int(port)
72 else:
73 self.port = None
74 self.database = database
75 self.query = query or {}
77 def __to_string__(self, hide_password=True):
78 s = self.drivername + "://"
79 if self.username is not None:
80 s += _rfc_1738_quote(self.username)
81 if self.password is not None:
82 s += ":" + (
83 "***" if hide_password else _rfc_1738_quote(self.password)
84 )
85 s += "@"
86 if self.host is not None:
87 if ":" in self.host:
88 s += "[%s]" % self.host
89 else:
90 s += self.host
91 if self.port is not None:
92 s += ":" + str(self.port)
93 if self.database is not None:
94 s += "/" + self.database
95 if self.query:
96 keys = list(self.query)
97 keys.sort()
98 s += "?" + "&".join(
99 "%s=%s" % (util.quote_plus(k), util.quote_plus(element))
100 for k in keys
101 for element in util.to_list(self.query[k])
102 )
103 return s
105 def __str__(self):
106 return self.__to_string__(hide_password=False)
108 def __repr__(self):
109 return self.__to_string__()
111 def __hash__(self):
112 return hash(str(self))
114 def __eq__(self, other):
115 return (
116 isinstance(other, URL)
117 and self.drivername == other.drivername
118 and self.username == other.username
119 and self.password == other.password
120 and self.host == other.host
121 and self.database == other.database
122 and self.query == other.query
123 and self.port == other.port
124 )
126 def __ne__(self, other):
127 return not self == other
129 @property
130 def password(self):
131 if self.password_original is None:
132 return None
133 else:
134 return util.text_type(self.password_original)
136 @password.setter
137 def password(self, password):
138 self.password_original = password
140 def get_backend_name(self):
141 if "+" not in self.drivername:
142 return self.drivername
143 else:
144 return self.drivername.split("+")[0]
146 def get_driver_name(self):
147 if "+" not in self.drivername:
148 return self.get_dialect().driver
149 else:
150 return self.drivername.split("+")[1]
152 def _instantiate_plugins(self, kwargs):
153 plugin_names = util.to_list(self.query.get("plugin", ()))
154 plugin_names += kwargs.get("plugins", [])
156 return [
157 plugins.load(plugin_name)(self, kwargs)
158 for plugin_name in plugin_names
159 ]
161 def _get_entrypoint(self):
162 """Return the "entry point" dialect class.
164 This is normally the dialect itself except in the case when the
165 returned class implements the get_dialect_cls() method.
167 """
168 if "+" not in self.drivername:
169 name = self.drivername
170 else:
171 name = self.drivername.replace("+", ".")
172 cls = registry.load(name)
173 # check for legacy dialects that
174 # would return a module with 'dialect' as the
175 # actual class
176 if (
177 hasattr(cls, "dialect")
178 and isinstance(cls.dialect, type)
179 and issubclass(cls.dialect, Dialect)
180 ):
181 return cls.dialect
182 else:
183 return cls
185 def get_dialect(self):
186 """Return the SQLAlchemy database dialect class corresponding
187 to this URL's driver name.
188 """
189 entrypoint = self._get_entrypoint()
190 dialect_cls = entrypoint.get_dialect_cls(self)
191 return dialect_cls
193 def translate_connect_args(self, names=[], **kw):
194 r"""Translate url attributes into a dictionary of connection arguments.
196 Returns attributes of this url (`host`, `database`, `username`,
197 `password`, `port`) as a plain dictionary. The attribute names are
198 used as the keys by default. Unset or false attributes are omitted
199 from the final dictionary.
201 :param \**kw: Optional, alternate key names for url attributes.
203 :param names: Deprecated. Same purpose as the keyword-based alternate
204 names, but correlates the name to the original positionally.
205 """
207 translated = {}
208 attribute_names = ["host", "database", "username", "password", "port"]
209 for sname in attribute_names:
210 if names:
211 name = names.pop(0)
212 elif sname in kw:
213 name = kw[sname]
214 else:
215 name = sname
216 if name is not None and getattr(self, sname, False):
217 translated[name] = getattr(self, sname)
218 return translated
221def make_url(name_or_url):
222 """Given a string or unicode instance, produce a new URL instance.
224 The given string is parsed according to the RFC 1738 spec. If an
225 existing URL object is passed, just returns the object.
226 """
228 if isinstance(name_or_url, util.string_types):
229 return _parse_rfc1738_args(name_or_url)
230 else:
231 return name_or_url
234def _parse_rfc1738_args(name):
235 pattern = re.compile(
236 r"""
237 (?P<name>[\w\+]+)://
238 (?:
239 (?P<username>[^:/]*)
240 (?::(?P<password>.*))?
241 @)?
242 (?:
243 (?:
244 \[(?P<ipv6host>[^/]+)\] |
245 (?P<ipv4host>[^/:]+)
246 )?
247 (?::(?P<port>[^/]*))?
248 )?
249 (?:/(?P<database>.*))?
250 """,
251 re.X,
252 )
254 m = pattern.match(name)
255 if m is not None:
256 components = m.groupdict()
257 if components["database"] is not None:
258 tokens = components["database"].split("?", 2)
259 components["database"] = tokens[0]
261 if len(tokens) > 1:
262 query = {}
264 for key, value in util.parse_qsl(tokens[1]):
265 if util.py2k:
266 key = key.encode("ascii")
267 if key in query:
268 query[key] = util.to_list(query[key])
269 query[key].append(value)
270 else:
271 query[key] = value
272 else:
273 query = None
274 else:
275 query = None
276 components["query"] = query
278 if components["username"] is not None:
279 components["username"] = _rfc_1738_unquote(components["username"])
281 if components["password"] is not None:
282 components["password"] = _rfc_1738_unquote(components["password"])
284 ipv4host = components.pop("ipv4host")
285 ipv6host = components.pop("ipv6host")
286 components["host"] = ipv4host or ipv6host
287 name = components.pop("name")
288 return URL(name, **components)
289 else:
290 raise exc.ArgumentError(
291 "Could not parse rfc1738 URL from string '%s'" % name
292 )
295def _rfc_1738_quote(text):
296 return re.sub(r"[:@/]", lambda m: "%%%X" % ord(m.group(0)), text)
299def _rfc_1738_unquote(text):
300 return util.unquote(text)
303def _parse_keyvalue_args(name):
304 m = re.match(r"(\w+)://(.*)", name)
305 if m is not None:
306 (name, args) = m.group(1, 2)
307 opts = dict(util.parse_qsl(args))
308 return URL(name, *opts)
309 else:
310 return None