Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# postgresql/pygresql.py 

2# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: http://www.opensource.org/licenses/mit-license.php 

7""" 

8.. dialect:: postgresql+pygresql 

9 :name: pygresql 

10 :dbapi: pgdb 

11 :connectstring: postgresql+pygresql://user:password@host:port/dbname[?key=value&key=value...] 

12 :url: http://www.pygresql.org/ 

13 

14.. note:: 

15 

16 The pygresql dialect is **not tested as part of SQLAlchemy's continuous 

17 integration** and may have unresolved issues. The recommended PostgreSQL 

18 dialect is psycopg2. 

19 

20""" # noqa 

21 

22import decimal 

23import re 

24 

25from .base import _DECIMAL_TYPES 

26from .base import _FLOAT_TYPES 

27from .base import _INT_TYPES 

28from .base import PGCompiler 

29from .base import PGDialect 

30from .base import PGIdentifierPreparer 

31from .base import UUID 

32from .hstore import HSTORE 

33from .json import JSON 

34from .json import JSONB 

35from ... import exc 

36from ... import processors 

37from ... import util 

38from ...sql.elements import Null 

39from ...types import JSON as Json 

40from ...types import Numeric 

41 

42 

43class _PGNumeric(Numeric): 

44 def bind_processor(self, dialect): 

45 return None 

46 

47 def result_processor(self, dialect, coltype): 

48 if not isinstance(coltype, int): 

49 coltype = coltype.oid 

50 if self.asdecimal: 

51 if coltype in _FLOAT_TYPES: 

52 return processors.to_decimal_processor_factory( 

53 decimal.Decimal, self._effective_decimal_return_scale 

54 ) 

55 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

56 # PyGreSQL returns Decimal natively for 1700 (numeric) 

57 return None 

58 else: 

59 raise exc.InvalidRequestError( 

60 "Unknown PG numeric type: %d" % coltype 

61 ) 

62 else: 

63 if coltype in _FLOAT_TYPES: 

64 # PyGreSQL returns float natively for 701 (float8) 

65 return None 

66 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

67 return processors.to_float 

68 else: 

69 raise exc.InvalidRequestError( 

70 "Unknown PG numeric type: %d" % coltype 

71 ) 

72 

73 

74class _PGHStore(HSTORE): 

75 def bind_processor(self, dialect): 

76 if not dialect.has_native_hstore: 

77 return super(_PGHStore, self).bind_processor(dialect) 

78 hstore = dialect.dbapi.Hstore 

79 

80 def process(value): 

81 if isinstance(value, dict): 

82 return hstore(value) 

83 return value 

84 

85 return process 

86 

87 def result_processor(self, dialect, coltype): 

88 if not dialect.has_native_hstore: 

89 return super(_PGHStore, self).result_processor(dialect, coltype) 

90 

91 

92class _PGJSON(JSON): 

93 def bind_processor(self, dialect): 

94 if not dialect.has_native_json: 

95 return super(_PGJSON, self).bind_processor(dialect) 

96 json = dialect.dbapi.Json 

97 

98 def process(value): 

99 if value is self.NULL: 

100 value = None 

101 elif isinstance(value, Null) or ( 

102 value is None and self.none_as_null 

103 ): 

104 return None 

105 if value is None or isinstance(value, (dict, list)): 

106 return json(value) 

107 return value 

108 

109 return process 

110 

111 def result_processor(self, dialect, coltype): 

112 if not dialect.has_native_json: 

113 return super(_PGJSON, self).result_processor(dialect, coltype) 

114 

115 

116class _PGJSONB(JSONB): 

117 def bind_processor(self, dialect): 

118 if not dialect.has_native_json: 

119 return super(_PGJSONB, self).bind_processor(dialect) 

120 json = dialect.dbapi.Json 

121 

122 def process(value): 

123 if value is self.NULL: 

124 value = None 

125 elif isinstance(value, Null) or ( 

126 value is None and self.none_as_null 

127 ): 

128 return None 

129 if value is None or isinstance(value, (dict, list)): 

130 return json(value) 

131 return value 

132 

133 return process 

134 

135 def result_processor(self, dialect, coltype): 

136 if not dialect.has_native_json: 

137 return super(_PGJSONB, self).result_processor(dialect, coltype) 

138 

139 

140class _PGUUID(UUID): 

141 def bind_processor(self, dialect): 

142 if not dialect.has_native_uuid: 

143 return super(_PGUUID, self).bind_processor(dialect) 

144 uuid = dialect.dbapi.Uuid 

145 

146 def process(value): 

147 if value is None: 

148 return None 

149 if isinstance(value, (str, bytes)): 

150 if len(value) == 16: 

151 return uuid(bytes=value) 

152 return uuid(value) 

153 if isinstance(value, int): 

154 return uuid(int=value) 

155 return value 

156 

157 return process 

158 

159 def result_processor(self, dialect, coltype): 

160 if not dialect.has_native_uuid: 

161 return super(_PGUUID, self).result_processor(dialect, coltype) 

162 if not self.as_uuid: 

163 

164 def process(value): 

165 if value is not None: 

166 return str(value) 

167 

168 return process 

169 

170 

171class _PGCompiler(PGCompiler): 

172 def visit_mod_binary(self, binary, operator, **kw): 

173 return ( 

174 self.process(binary.left, **kw) 

175 + " %% " 

176 + self.process(binary.right, **kw) 

177 ) 

178 

179 def post_process_text(self, text): 

180 return text.replace("%", "%%") 

181 

182 

183class _PGIdentifierPreparer(PGIdentifierPreparer): 

184 def _escape_identifier(self, value): 

185 value = value.replace(self.escape_quote, self.escape_to_quote) 

186 return value.replace("%", "%%") 

187 

188 

189class PGDialect_pygresql(PGDialect): 

190 

191 driver = "pygresql" 

192 

193 statement_compiler = _PGCompiler 

194 preparer = _PGIdentifierPreparer 

195 

196 @classmethod 

197 def dbapi(cls): 

198 import pgdb 

199 

200 return pgdb 

201 

202 colspecs = util.update_copy( 

203 PGDialect.colspecs, 

204 { 

205 Numeric: _PGNumeric, 

206 HSTORE: _PGHStore, 

207 Json: _PGJSON, 

208 JSON: _PGJSON, 

209 JSONB: _PGJSONB, 

210 UUID: _PGUUID, 

211 }, 

212 ) 

213 

214 def __init__(self, **kwargs): 

215 super(PGDialect_pygresql, self).__init__(**kwargs) 

216 try: 

217 version = self.dbapi.version 

218 m = re.match(r"(\d+)\.(\d+)", version) 

219 version = (int(m.group(1)), int(m.group(2))) 

220 except (AttributeError, ValueError, TypeError): 

221 version = (0, 0) 

222 self.dbapi_version = version 

223 if version < (5, 0): 

224 has_native_hstore = has_native_json = has_native_uuid = False 

225 if version != (0, 0): 

226 util.warn( 

227 "PyGreSQL is only fully supported by SQLAlchemy" 

228 " since version 5.0." 

229 ) 

230 else: 

231 self.supports_unicode_statements = True 

232 self.supports_unicode_binds = True 

233 has_native_hstore = has_native_json = has_native_uuid = True 

234 self.has_native_hstore = has_native_hstore 

235 self.has_native_json = has_native_json 

236 self.has_native_uuid = has_native_uuid 

237 

238 def create_connect_args(self, url): 

239 opts = url.translate_connect_args(username="user") 

240 if "port" in opts: 

241 opts["host"] = "%s:%s" % ( 

242 opts.get("host", "").rsplit(":", 1)[0], 

243 opts.pop("port"), 

244 ) 

245 opts.update(url.query) 

246 return [], opts 

247 

248 def is_disconnect(self, e, connection, cursor): 

249 if isinstance(e, self.dbapi.Error): 

250 if not connection: 

251 return False 

252 try: 

253 connection = connection.connection 

254 except AttributeError: 

255 pass 

256 else: 

257 if not connection: 

258 return False 

259 try: 

260 return connection.closed 

261 except AttributeError: # PyGreSQL < 5.0 

262 return connection._cnx is None 

263 return False 

264 

265 

266dialect = PGDialect_pygresql