Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# postgresql/psycopg2.py 

2# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: http://www.opensource.org/licenses/mit-license.php 

7r""" 

8.. dialect:: postgresql+psycopg2 

9 :name: psycopg2 

10 :dbapi: psycopg2 

11 :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...] 

12 :url: http://pypi.python.org/pypi/psycopg2/ 

13 

14psycopg2 Connect Arguments 

15----------------------------------- 

16 

17psycopg2-specific keyword arguments which are accepted by 

18:func:`_sa.create_engine()` are: 

19 

20* ``server_side_cursors``: Enable the usage of "server side cursors" for SQL 

21 statements which support this feature. What this essentially means from a 

22 psycopg2 point of view is that the cursor is created using a name, e.g. 

23 ``connection.cursor('some name')``, which has the effect that result rows 

24 are not immediately pre-fetched and buffered after statement execution, but 

25 are instead left on the server and only retrieved as needed. SQLAlchemy's 

26 :class:`~sqlalchemy.engine.ResultProxy` uses special row-buffering 

27 behavior when this feature is enabled, such that groups of 100 rows at a 

28 time are fetched over the wire to reduce conversational overhead. 

29 Note that the :paramref:`.Connection.execution_options.stream_results` 

30 execution option is a more targeted 

31 way of enabling this mode on a per-execution basis. 

32 

33* ``use_native_unicode``: Enable the usage of Psycopg2 "native unicode" mode 

34 per connection. True by default. 

35 

36 .. seealso:: 

37 

38 :ref:`psycopg2_disable_native_unicode` 

39 

40* ``isolation_level``: This option, available for all PostgreSQL dialects, 

41 includes the ``AUTOCOMMIT`` isolation level when using the psycopg2 

42 dialect. 

43 

44 .. seealso:: 

45 

46 :ref:`psycopg2_isolation_level` 

47 

48* ``client_encoding``: sets the client encoding in a libpq-agnostic way, 

49 using psycopg2's ``set_client_encoding()`` method. 

50 

51 .. seealso:: 

52 

53 :ref:`psycopg2_unicode` 

54 

55* ``executemany_mode``, ``executemany_batch_page_size``, 

56 ``executemany_values_page_size``: Allows use of psycopg2 

57 extensions for optimizing "executemany"-stye queries. See the referenced 

58 section below for details. 

59 

60 .. seealso:: 

61 

62 :ref:`psycopg2_executemany_mode` 

63 

64* ``use_batch_mode``: this is the previous setting used to affect "executemany" 

65 mode and is now deprecated. 

66 

67 

68Unix Domain Connections 

69------------------------ 

70 

71psycopg2 supports connecting via Unix domain connections. When the ``host`` 

72portion of the URL is omitted, SQLAlchemy passes ``None`` to psycopg2, 

73which specifies Unix-domain communication rather than TCP/IP communication:: 

74 

75 create_engine("postgresql+psycopg2://user:password@/dbname") 

76 

77By default, the socket file used is to connect to a Unix-domain socket 

78in ``/tmp``, or whatever socket directory was specified when PostgreSQL 

79was built. This value can be overridden by passing a pathname to psycopg2, 

80using ``host`` as an additional keyword argument:: 

81 

82 create_engine("postgresql+psycopg2://user:password@/dbname?host=/var/lib/postgresql") 

83 

84.. seealso:: 

85 

86 `PQconnectdbParams \ 

87 <http://www.postgresql.org/docs/9.1/static/libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_ 

88 

89Empty DSN Connections / Environment Variable Connections 

90--------------------------------------------------------- 

91 

92The psycopg2 DBAPI can connect to PostgreSQL by passing an empty DSN to the 

93libpq client library, which by default indicates to connect to a localhost 

94PostgreSQL database that is open for "trust" connections. This behavior can be 

95further tailored using a particular set of environment variables which are 

96prefixed with ``PG_...``, which are consumed by ``libpq`` to take the place of 

97any or all elements of the connection string. 

98 

99For this form, the URL can be passed without any elements other than the 

100initial scheme:: 

101 

102 engine = create_engine('postgresql+psycopg2://') 

103 

104In the above form, a blank "dsn" string is passed to the ``psycopg2.connect()`` 

105function which in turn represents an empty DSN passed to libpq. 

106 

107.. versionadded:: 1.3.2 support for parameter-less connections with psycopg2. 

108 

109.. seealso:: 

110 

111 `Environment Variables\ 

112 <https://www.postgresql.org/docs/current/libpq-envars.html>`_ - 

113 PostgreSQL documentation on how to use ``PG_...`` 

114 environment variables for connections. 

115 

116.. _psycopg2_execution_options: 

117 

118Per-Statement/Connection Execution Options 

119------------------------------------------- 

120 

121The following DBAPI-specific options are respected when used with 

122:meth:`_engine.Connection.execution_options`, 

123:meth:`.Executable.execution_options`, 

124:meth:`_query.Query.execution_options`, 

125in addition to those not specific to DBAPIs: 

126 

127* ``isolation_level`` - Set the transaction isolation level for the lifespan 

128 of a :class:`_engine.Connection` (can only be set on a connection, 

129 not a statement 

130 or query). See :ref:`psycopg2_isolation_level`. 

131 

132* ``stream_results`` - Enable or disable usage of psycopg2 server side 

133 cursors - this feature makes use of "named" cursors in combination with 

134 special result handling methods so that result rows are not fully buffered. 

135 If ``None`` or not set, the ``server_side_cursors`` option of the 

136 :class:`_engine.Engine` is used. 

137 

138* ``max_row_buffer`` - when using ``stream_results``, an integer value that 

139 specifies the maximum number of rows to buffer at a time. This is 

140 interpreted by the :class:`.BufferedRowResultProxy`, and if omitted the 

141 buffer will grow to ultimately store 1000 rows at a time. 

142 

143 .. versionadded:: 1.0.6 

144 

145.. _psycopg2_batch_mode: 

146 

147.. _psycopg2_executemany_mode: 

148 

149Psycopg2 Fast Execution Helpers 

150------------------------------- 

151 

152Modern versions of psycopg2 include a feature known as 

153`Fast Execution Helpers \ 

154<http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which 

155have been shown in benchmarking to improve psycopg2's executemany() 

156performance, primarily with INSERT statements, by multiple orders of magnitude. 

157SQLAlchemy allows this extension to be used for all ``executemany()`` style 

158calls invoked by an :class:`_engine.Engine` 

159when used with :ref:`multiple parameter 

160sets <execute_multiple>`, which includes the use of this feature both by the 

161Core as well as by the ORM for inserts of objects with non-autogenerated 

162primary key values, by adding the ``executemany_mode`` flag to 

163:func:`_sa.create_engine`:: 

164 

165 engine = create_engine( 

166 "postgresql+psycopg2://scott:tiger@host/dbname", 

167 executemany_mode='batch') 

168 

169 

170.. versionchanged:: 1.3.7 - the ``use_batch_mode`` flag has been superseded 

171 by a new parameter ``executemany_mode`` which provides support both for 

172 psycopg2's ``execute_batch`` helper as well as the ``execute_values`` 

173 helper. 

174 

175Possible options for ``executemany_mode`` include: 

176 

177* ``None`` - By default, psycopg2's extensions are not used, and the usual 

178 ``cursor.executemany()`` method is used when invoking batches of statements. 

179 

180* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies 

181 of a SQL query, each one corresponding to a parameter set passed to 

182 ``executemany()``, are joined into a single SQL string separated by a 

183 semicolon. This is the same behavior as was provided by the 

184 ``use_batch_mode=True`` flag. 

185 

186* ``'values'``- For Core :func:`_expression.insert` 

187 constructs only (including those 

188 emitted by the ORM automatically), the ``psycopg2.extras.execute_values`` 

189 extension is used so that multiple parameter sets are grouped into a single 

190 INSERT statement and joined together with multiple VALUES expressions. This 

191 method requires that the string text of the VALUES clause inside the 

192 INSERT statement is manipulated, so is only supported with a compiled 

193 :func:`_expression.insert` construct where the format is predictable. 

194 For all other 

195 constructs, including plain textual INSERT statements not rendered by the 

196 SQLAlchemy expression language compiler, the 

197 ``psycopg2.extras.execute_batch`` method is used. It is therefore important 

198 to note that **"values" mode implies that "batch" mode is also used for 

199 all statements for which "values" mode does not apply**. 

200 

201For both strategies, the ``executemany_batch_page_size`` and 

202``executemany_values_page_size`` arguments control how many parameter sets 

203should be represented in each execution. Because "values" mode implies a 

204fallback down to "batch" mode for non-INSERT statements, there are two 

205independent page size arguments. For each, the default value of ``None`` means 

206to use psycopg2's defaults, which at the time of this writing are quite low at 

207100. For the ``execute_values`` method, a number as high as 10000 may prove 

208to be performant, whereas for ``execute_batch``, as the number represents 

209full statements repeated, a number closer to the default of 100 is likely 

210more appropriate:: 

211 

212 engine = create_engine( 

213 "postgresql+psycopg2://scott:tiger@host/dbname", 

214 executemany_mode='values', 

215 executemany_values_page_size=10000, executemany_batch_page_size=500) 

216 

217 

218.. seealso:: 

219 

220 :ref:`execute_multiple` - General information on using the 

221 :class:`_engine.Connection` 

222 object to execute statements in such a way as to make 

223 use of the DBAPI ``.executemany()`` method. 

224 

225.. versionchanged:: 1.3.7 - Added support for 

226 ``psycopg2.extras.execute_values``. The ``use_batch_mode`` flag is 

227 superseded by the ``executemany_mode`` flag. 

228 

229 

230.. _psycopg2_unicode: 

231 

232Unicode with Psycopg2 

233---------------------- 

234 

235By default, the psycopg2 driver uses the ``psycopg2.extensions.UNICODE`` 

236extension, such that the DBAPI receives and returns all strings as Python 

237Unicode objects directly - SQLAlchemy passes these values through without 

238change. Psycopg2 here will encode/decode string values based on the 

239current "client encoding" setting; by default this is the value in 

240the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. 

241Typically, this can be changed to ``utf8``, as a more useful default:: 

242 

243 # postgresql.conf file 

244 

245 # client_encoding = sql_ascii # actually, defaults to database 

246 # encoding 

247 client_encoding = utf8 

248 

249A second way to affect the client encoding is to set it within Psycopg2 

250locally. SQLAlchemy will call psycopg2's 

251:meth:`psycopg2:connection.set_client_encoding` method 

252on all new connections based on the value passed to 

253:func:`_sa.create_engine` using the ``client_encoding`` parameter:: 

254 

255 # set_client_encoding() setting; 

256 # works for *all* PostgreSQL versions 

257 engine = create_engine("postgresql://user:pass@host/dbname", 

258 client_encoding='utf8') 

259 

260This overrides the encoding specified in the PostgreSQL client configuration. 

261When using the parameter in this way, the psycopg2 driver emits 

262``SET client_encoding TO 'utf8'`` on the connection explicitly, and works 

263in all PostgreSQL versions. 

264 

265Note that the ``client_encoding`` setting as passed to 

266:func:`_sa.create_engine` 

267is **not the same** as the more recently added ``client_encoding`` parameter 

268now supported by libpq directly. This is enabled when ``client_encoding`` 

269is passed directly to ``psycopg2.connect()``, and from SQLAlchemy is passed 

270using the :paramref:`_sa.create_engine.connect_args` parameter:: 

271 

272 engine = create_engine( 

273 "postgresql://user:pass@host/dbname", 

274 connect_args={'client_encoding': 'utf8'}) 

275 

276 # using the query string is equivalent 

277 engine = create_engine("postgresql://user:pass@host/dbname?client_encoding=utf8") 

278 

279The above parameter was only added to libpq as of version 9.1 of PostgreSQL, 

280so using the previous method is better for cross-version support. 

281 

282.. _psycopg2_disable_native_unicode: 

283 

284Disabling Native Unicode 

285^^^^^^^^^^^^^^^^^^^^^^^^ 

286 

287SQLAlchemy can also be instructed to skip the usage of the psycopg2 

288``UNICODE`` extension and to instead utilize its own unicode encode/decode 

289services, which are normally reserved only for those DBAPIs that don't 

290fully support unicode directly. Passing ``use_native_unicode=False`` to 

291:func:`_sa.create_engine` will disable usage of ``psycopg2.extensions. 

292UNICODE``. 

293SQLAlchemy will instead encode data itself into Python bytestrings on the way 

294in and coerce from bytes on the way back, 

295using the value of the :func:`_sa.create_engine` ``encoding`` parameter, which 

296defaults to ``utf-8``. 

297SQLAlchemy's own unicode encode/decode functionality is steadily becoming 

298obsolete as most DBAPIs now support unicode fully. 

299 

300Bound Parameter Styles 

301---------------------- 

302 

303The default parameter style for the psycopg2 dialect is "pyformat", where 

304SQL is rendered using ``%(paramname)s`` style. This format has the limitation 

305that it does not accommodate the unusual case of parameter names that 

306actually contain percent or parenthesis symbols; as SQLAlchemy in many cases 

307generates bound parameter names based on the name of a column, the presence 

308of these characters in a column name can lead to problems. 

309 

310There are two solutions to the issue of a :class:`_schema.Column` 

311that contains 

312one of these characters in its name. One is to specify the 

313:paramref:`.schema.Column.key` for columns that have such names:: 

314 

315 measurement = Table('measurement', metadata, 

316 Column('Size (meters)', Integer, key='size_meters') 

317 ) 

318 

319Above, an INSERT statement such as ``measurement.insert()`` will use 

320``size_meters`` as the parameter name, and a SQL expression such as 

321``measurement.c.size_meters > 10`` will derive the bound parameter name 

322from the ``size_meters`` key as well. 

323 

324.. versionchanged:: 1.0.0 - SQL expressions will use 

325 :attr:`_schema.Column.key` 

326 as the source of naming when anonymous bound parameters are created 

327 in SQL expressions; previously, this behavior only applied to 

328 :meth:`_schema.Table.insert` and :meth:`_schema.Table.update` 

329 parameter names. 

330 

331The other solution is to use a positional format; psycopg2 allows use of the 

332"format" paramstyle, which can be passed to 

333:paramref:`_sa.create_engine.paramstyle`:: 

334 

335 engine = create_engine( 

336 'postgresql://scott:tiger@localhost:5432/test', paramstyle='format') 

337 

338With the above engine, instead of a statement like:: 

339 

340 INSERT INTO measurement ("Size (meters)") VALUES (%(Size (meters))s) 

341 {'Size (meters)': 1} 

342 

343we instead see:: 

344 

345 INSERT INTO measurement ("Size (meters)") VALUES (%s) 

346 (1, ) 

347 

348Where above, the dictionary style is converted into a tuple with positional 

349style. 

350 

351 

352Transactions 

353------------ 

354 

355The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations. 

356 

357.. _psycopg2_isolation_level: 

358 

359Psycopg2 Transaction Isolation Level 

360------------------------------------- 

361 

362As discussed in :ref:`postgresql_isolation_level`, 

363all PostgreSQL dialects support setting of transaction isolation level 

364both via the ``isolation_level`` parameter passed to :func:`_sa.create_engine` 

365, 

366as well as the ``isolation_level`` argument used by 

367:meth:`_engine.Connection.execution_options`. When using the psycopg2 dialect 

368, these 

369options make use of psycopg2's ``set_isolation_level()`` connection method, 

370rather than emitting a PostgreSQL directive; this is because psycopg2's 

371API-level setting is always emitted at the start of each transaction in any 

372case. 

373 

374The psycopg2 dialect supports these constants for isolation level: 

375 

376* ``READ COMMITTED`` 

377* ``READ UNCOMMITTED`` 

378* ``REPEATABLE READ`` 

379* ``SERIALIZABLE`` 

380* ``AUTOCOMMIT`` 

381 

382.. seealso:: 

383 

384 :ref:`postgresql_isolation_level` 

385 

386 :ref:`pg8000_isolation_level` 

387 

388 

389NOTICE logging 

390--------------- 

391 

392The psycopg2 dialect will log PostgreSQL NOTICE messages 

393via the ``sqlalchemy.dialects.postgresql`` logger. When this logger 

394is set to the ``logging.INFO`` level, notice messages will be logged:: 

395 

396 import logging 

397 

398 logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO) 

399 

400Above, it is assumed that logging is configured externally. If this is not 

401the case, configuration such as ``logging.basicConfig()`` must be utilized:: 

402 

403 import logging 

404 

405 logging.basicConfig() # log messages to stdout 

406 logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO) 

407 

408.. seealso:: 

409 

410 `Logging HOWTO <https://docs.python.org/3/howto/logging.html>`_ - on the python.org website 

411 

412.. _psycopg2_hstore: 

413 

414HSTORE type 

415------------ 

416 

417The ``psycopg2`` DBAPI includes an extension to natively handle marshalling of 

418the HSTORE type. The SQLAlchemy psycopg2 dialect will enable this extension 

419by default when psycopg2 version 2.4 or greater is used, and 

420it is detected that the target database has the HSTORE type set up for use. 

421In other words, when the dialect makes the first 

422connection, a sequence like the following is performed: 

423 

4241. Request the available HSTORE oids using 

425 ``psycopg2.extras.HstoreAdapter.get_oids()``. 

426 If this function returns a list of HSTORE identifiers, we then determine 

427 that the ``HSTORE`` extension is present. 

428 This function is **skipped** if the version of psycopg2 installed is 

429 less than version 2.4. 

430 

4312. If the ``use_native_hstore`` flag is at its default of ``True``, and 

432 we've detected that ``HSTORE`` oids are available, the 

433 ``psycopg2.extensions.register_hstore()`` extension is invoked for all 

434 connections. 

435 

436The ``register_hstore()`` extension has the effect of **all Python 

437dictionaries being accepted as parameters regardless of the type of target 

438column in SQL**. The dictionaries are converted by this extension into a 

439textual HSTORE expression. If this behavior is not desired, disable the 

440use of the hstore extension by setting ``use_native_hstore`` to ``False`` as 

441follows:: 

442 

443 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", 

444 use_native_hstore=False) 

445 

446The ``HSTORE`` type is **still supported** when the 

447``psycopg2.extensions.register_hstore()`` extension is not used. It merely 

448means that the coercion between Python dictionaries and the HSTORE 

449string format, on both the parameter side and the result side, will take 

450place within SQLAlchemy's own marshalling logic, and not that of ``psycopg2`` 

451which may be more performant. 

452 

453""" # noqa 

454from __future__ import absolute_import 

455 

456import decimal 

457import logging 

458import re 

459 

460from .base import _DECIMAL_TYPES 

461from .base import _FLOAT_TYPES 

462from .base import _INT_TYPES 

463from .base import ENUM 

464from .base import PGCompiler 

465from .base import PGDialect 

466from .base import PGExecutionContext 

467from .base import PGIdentifierPreparer 

468from .base import UUID 

469from .hstore import HSTORE 

470from .json import JSON 

471from .json import JSONB 

472from ... import exc 

473from ... import processors 

474from ... import types as sqltypes 

475from ... import util 

476from ...engine import result as _result 

477from ...util import collections_abc 

478 

479try: 

480 from uuid import UUID as _python_UUID # noqa 

481except ImportError: 

482 _python_UUID = None 

483 

484 

485logger = logging.getLogger("sqlalchemy.dialects.postgresql") 

486 

487 

488class _PGNumeric(sqltypes.Numeric): 

489 def bind_processor(self, dialect): 

490 return None 

491 

492 def result_processor(self, dialect, coltype): 

493 if self.asdecimal: 

494 if coltype in _FLOAT_TYPES: 

495 return processors.to_decimal_processor_factory( 

496 decimal.Decimal, self._effective_decimal_return_scale 

497 ) 

498 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

499 # pg8000 returns Decimal natively for 1700 

500 return None 

501 else: 

502 raise exc.InvalidRequestError( 

503 "Unknown PG numeric type: %d" % coltype 

504 ) 

505 else: 

506 if coltype in _FLOAT_TYPES: 

507 # pg8000 returns float natively for 701 

508 return None 

509 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

510 return processors.to_float 

511 else: 

512 raise exc.InvalidRequestError( 

513 "Unknown PG numeric type: %d" % coltype 

514 ) 

515 

516 

517class _PGEnum(ENUM): 

518 def result_processor(self, dialect, coltype): 

519 if util.py2k and self._expect_unicode is True: 

520 # for py2k, if the enum type needs unicode data (which is set up as 

521 # part of the Enum() constructor based on values passed as py2k 

522 # unicode objects) we have to use our own converters since 

523 # psycopg2's don't work, a rare exception to the "modern DBAPIs 

524 # support unicode everywhere" theme of deprecating 

525 # convert_unicode=True. Use the special "force_nocheck" directive 

526 # which forces unicode conversion to happen on the Python side 

527 # without an isinstance() check. in py3k psycopg2 does the right 

528 # thing automatically. 

529 self._expect_unicode = "force_nocheck" 

530 return super(_PGEnum, self).result_processor(dialect, coltype) 

531 

532 

533class _PGHStore(HSTORE): 

534 def bind_processor(self, dialect): 

535 if dialect._has_native_hstore: 

536 return None 

537 else: 

538 return super(_PGHStore, self).bind_processor(dialect) 

539 

540 def result_processor(self, dialect, coltype): 

541 if dialect._has_native_hstore: 

542 return None 

543 else: 

544 return super(_PGHStore, self).result_processor(dialect, coltype) 

545 

546 

547class _PGJSON(JSON): 

548 def result_processor(self, dialect, coltype): 

549 if dialect._has_native_json: 

550 return None 

551 else: 

552 return super(_PGJSON, self).result_processor(dialect, coltype) 

553 

554 

555class _PGJSONB(JSONB): 

556 def result_processor(self, dialect, coltype): 

557 if dialect._has_native_jsonb: 

558 return None 

559 else: 

560 return super(_PGJSONB, self).result_processor(dialect, coltype) 

561 

562 

563class _PGUUID(UUID): 

564 def bind_processor(self, dialect): 

565 if not self.as_uuid and dialect.use_native_uuid: 

566 

567 def process(value): 

568 if value is not None: 

569 value = _python_UUID(value) 

570 return value 

571 

572 return process 

573 

574 def result_processor(self, dialect, coltype): 

575 if not self.as_uuid and dialect.use_native_uuid: 

576 

577 def process(value): 

578 if value is not None: 

579 value = str(value) 

580 return value 

581 

582 return process 

583 

584 

585_server_side_id = util.counter() 

586 

587 

588class PGExecutionContext_psycopg2(PGExecutionContext): 

589 def create_server_side_cursor(self): 

590 # use server-side cursors: 

591 # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html 

592 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) 

593 return self._dbapi_connection.cursor(ident) 

594 

595 def get_result_proxy(self): 

596 self._log_notices(self.cursor) 

597 

598 if self._is_server_side: 

599 return _result.BufferedRowResultProxy(self) 

600 else: 

601 return _result.ResultProxy(self) 

602 

603 def _log_notices(self, cursor): 

604 # check also that notices is an iterable, after it's already 

605 # established that we will be iterating through it. This is to get 

606 # around test suites such as SQLAlchemy's using a Mock object for 

607 # cursor 

608 if not cursor.connection.notices or not isinstance( 

609 cursor.connection.notices, collections_abc.Iterable 

610 ): 

611 return 

612 

613 for notice in cursor.connection.notices: 

614 # NOTICE messages have a 

615 # newline character at the end 

616 logger.info(notice.rstrip()) 

617 

618 cursor.connection.notices[:] = [] 

619 

620 

621class PGCompiler_psycopg2(PGCompiler): 

622 pass 

623 

624 

625class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): 

626 pass 

627 

628 

629EXECUTEMANY_DEFAULT = util.symbol("executemany_default") 

630EXECUTEMANY_BATCH = util.symbol("executemany_batch") 

631EXECUTEMANY_VALUES = util.symbol("executemany_values") 

632 

633 

634class PGDialect_psycopg2(PGDialect): 

635 driver = "psycopg2" 

636 if util.py2k: 

637 supports_unicode_statements = False 

638 

639 supports_server_side_cursors = True 

640 

641 default_paramstyle = "pyformat" 

642 # set to true based on psycopg2 version 

643 supports_sane_multi_rowcount = False 

644 execution_ctx_cls = PGExecutionContext_psycopg2 

645 statement_compiler = PGCompiler_psycopg2 

646 preparer = PGIdentifierPreparer_psycopg2 

647 psycopg2_version = (0, 0) 

648 

649 FEATURE_VERSION_MAP = dict( 

650 native_json=(2, 5), 

651 native_jsonb=(2, 5, 4), 

652 sane_multi_rowcount=(2, 0, 9), 

653 array_oid=(2, 4, 3), 

654 hstore_adapter=(2, 4), 

655 ) 

656 

657 _has_native_hstore = False 

658 _has_native_json = False 

659 _has_native_jsonb = False 

660 

661 engine_config_types = PGDialect.engine_config_types.union( 

662 [("use_native_unicode", util.asbool)] 

663 ) 

664 

665 colspecs = util.update_copy( 

666 PGDialect.colspecs, 

667 { 

668 sqltypes.Numeric: _PGNumeric, 

669 ENUM: _PGEnum, # needs force_unicode 

670 sqltypes.Enum: _PGEnum, # needs force_unicode 

671 HSTORE: _PGHStore, 

672 JSON: _PGJSON, 

673 sqltypes.JSON: _PGJSON, 

674 JSONB: _PGJSONB, 

675 UUID: _PGUUID, 

676 }, 

677 ) 

678 

679 @util.deprecated_params( 

680 use_batch_mode=( 

681 "1.3.7", 

682 "The psycopg2 use_batch_mode flag is superseded by " 

683 "executemany_mode='batch'", 

684 ) 

685 ) 

686 def __init__( 

687 self, 

688 server_side_cursors=False, 

689 use_native_unicode=True, 

690 client_encoding=None, 

691 use_native_hstore=True, 

692 use_native_uuid=True, 

693 executemany_mode=None, 

694 executemany_batch_page_size=None, 

695 executemany_values_page_size=None, 

696 use_batch_mode=None, 

697 **kwargs 

698 ): 

699 PGDialect.__init__(self, **kwargs) 

700 self.server_side_cursors = server_side_cursors 

701 self.use_native_unicode = use_native_unicode 

702 self.use_native_hstore = use_native_hstore 

703 self.use_native_uuid = use_native_uuid 

704 self.supports_unicode_binds = use_native_unicode 

705 self.client_encoding = client_encoding 

706 

707 # Parse executemany_mode argument, allowing it to be only one of the 

708 # symbol names 

709 self.executemany_mode = util.symbol.parse_user_argument( 

710 executemany_mode, 

711 { 

712 EXECUTEMANY_DEFAULT: [None], 

713 EXECUTEMANY_BATCH: ["batch"], 

714 EXECUTEMANY_VALUES: ["values"], 

715 }, 

716 "executemany_mode", 

717 ) 

718 if use_batch_mode: 

719 self.executemany_mode = EXECUTEMANY_BATCH 

720 

721 self.executemany_batch_page_size = executemany_batch_page_size 

722 self.executemany_values_page_size = executemany_values_page_size 

723 

724 if self.dbapi and hasattr(self.dbapi, "__version__"): 

725 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) 

726 if m: 

727 self.psycopg2_version = tuple( 

728 int(x) for x in m.group(1, 2, 3) if x is not None 

729 ) 

730 

731 def initialize(self, connection): 

732 super(PGDialect_psycopg2, self).initialize(connection) 

733 self._has_native_hstore = ( 

734 self.use_native_hstore 

735 and self._hstore_oids(connection.connection) is not None 

736 ) 

737 self._has_native_json = ( 

738 self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_json"] 

739 ) 

740 self._has_native_jsonb = ( 

741 self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_jsonb"] 

742 ) 

743 

744 # http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9 

745 self.supports_sane_multi_rowcount = ( 

746 self.psycopg2_version 

747 >= self.FEATURE_VERSION_MAP["sane_multi_rowcount"] 

748 and self.executemany_mode is EXECUTEMANY_DEFAULT 

749 ) 

750 

751 @classmethod 

752 def dbapi(cls): 

753 import psycopg2 

754 

755 return psycopg2 

756 

757 @classmethod 

758 def _psycopg2_extensions(cls): 

759 from psycopg2 import extensions 

760 

761 return extensions 

762 

763 @classmethod 

764 def _psycopg2_extras(cls): 

765 from psycopg2 import extras 

766 

767 return extras 

768 

769 @util.memoized_property 

770 def _isolation_lookup(self): 

771 extensions = self._psycopg2_extensions() 

772 return { 

773 "AUTOCOMMIT": extensions.ISOLATION_LEVEL_AUTOCOMMIT, 

774 "READ COMMITTED": extensions.ISOLATION_LEVEL_READ_COMMITTED, 

775 "READ UNCOMMITTED": extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, 

776 "REPEATABLE READ": extensions.ISOLATION_LEVEL_REPEATABLE_READ, 

777 "SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE, 

778 } 

779 

780 def set_isolation_level(self, connection, level): 

781 try: 

782 level = self._isolation_lookup[level.replace("_", " ")] 

783 except KeyError as err: 

784 util.raise_( 

785 exc.ArgumentError( 

786 "Invalid value '%s' for isolation_level. " 

787 "Valid isolation levels for %s are %s" 

788 % (level, self.name, ", ".join(self._isolation_lookup)) 

789 ), 

790 replace_context=err, 

791 ) 

792 

793 connection.set_isolation_level(level) 

794 

795 def on_connect(self): 

796 extras = self._psycopg2_extras() 

797 extensions = self._psycopg2_extensions() 

798 

799 fns = [] 

800 if self.client_encoding is not None: 

801 

802 def on_connect(conn): 

803 conn.set_client_encoding(self.client_encoding) 

804 

805 fns.append(on_connect) 

806 

807 if self.isolation_level is not None: 

808 

809 def on_connect(conn): 

810 self.set_isolation_level(conn, self.isolation_level) 

811 

812 fns.append(on_connect) 

813 

814 if self.dbapi and self.use_native_uuid: 

815 

816 def on_connect(conn): 

817 extras.register_uuid(None, conn) 

818 

819 fns.append(on_connect) 

820 

821 if self.dbapi and self.use_native_unicode: 

822 

823 def on_connect(conn): 

824 extensions.register_type(extensions.UNICODE, conn) 

825 extensions.register_type(extensions.UNICODEARRAY, conn) 

826 

827 fns.append(on_connect) 

828 

829 if self.dbapi and self.use_native_hstore: 

830 

831 def on_connect(conn): 

832 hstore_oids = self._hstore_oids(conn) 

833 if hstore_oids is not None: 

834 oid, array_oid = hstore_oids 

835 kw = {"oid": oid} 

836 if util.py2k: 

837 kw["unicode"] = True 

838 if ( 

839 self.psycopg2_version 

840 >= self.FEATURE_VERSION_MAP["array_oid"] 

841 ): 

842 kw["array_oid"] = array_oid 

843 extras.register_hstore(conn, **kw) 

844 

845 fns.append(on_connect) 

846 

847 if self.dbapi and self._json_deserializer: 

848 

849 def on_connect(conn): 

850 if self._has_native_json: 

851 extras.register_default_json( 

852 conn, loads=self._json_deserializer 

853 ) 

854 if self._has_native_jsonb: 

855 extras.register_default_jsonb( 

856 conn, loads=self._json_deserializer 

857 ) 

858 

859 fns.append(on_connect) 

860 

861 if fns: 

862 

863 def on_connect(conn): 

864 for fn in fns: 

865 fn(conn) 

866 

867 return on_connect 

868 else: 

869 return None 

870 

871 def do_executemany(self, cursor, statement, parameters, context=None): 

872 if self.executemany_mode is EXECUTEMANY_DEFAULT: 

873 cursor.executemany(statement, parameters) 

874 return 

875 

876 if ( 

877 self.executemany_mode is EXECUTEMANY_VALUES 

878 and context 

879 and context.isinsert 

880 and context.compiled.insert_single_values_expr 

881 ): 

882 executemany_values = ( 

883 "(%s)" % context.compiled.insert_single_values_expr 

884 ) 

885 # guard for statement that was altered via event hook or similar 

886 if executemany_values not in statement: 

887 executemany_values = None 

888 else: 

889 executemany_values = None 

890 

891 if executemany_values: 

892 # Currently, SQLAlchemy does not pass "RETURNING" statements 

893 # into executemany(), since no DBAPI has ever supported that 

894 # until the introduction of psycopg2's executemany_values, so 

895 # we are not yet using the fetch=True flag. 

896 statement = statement.replace(executemany_values, "%s") 

897 if self.executemany_values_page_size: 

898 kwargs = {"page_size": self.executemany_values_page_size} 

899 else: 

900 kwargs = {} 

901 self._psycopg2_extras().execute_values( 

902 cursor, 

903 statement, 

904 parameters, 

905 template=executemany_values, 

906 **kwargs 

907 ) 

908 

909 else: 

910 if self.executemany_batch_page_size: 

911 kwargs = {"page_size": self.executemany_batch_page_size} 

912 else: 

913 kwargs = {} 

914 self._psycopg2_extras().execute_batch( 

915 cursor, statement, parameters, **kwargs 

916 ) 

917 

918 @util.memoized_instancemethod 

919 def _hstore_oids(self, conn): 

920 if self.psycopg2_version >= self.FEATURE_VERSION_MAP["hstore_adapter"]: 

921 extras = self._psycopg2_extras() 

922 oids = extras.HstoreAdapter.get_oids(conn) 

923 if oids is not None and oids[0]: 

924 return oids[0:2] 

925 return None 

926 

927 def create_connect_args(self, url): 

928 opts = url.translate_connect_args(username="user") 

929 if opts: 

930 if "port" in opts: 

931 opts["port"] = int(opts["port"]) 

932 opts.update(url.query) 

933 # send individual dbname, user, password, host, port 

934 # parameters to psycopg2.connect() 

935 return ([], opts) 

936 elif url.query: 

937 # any other connection arguments, pass directly 

938 opts.update(url.query) 

939 return ([], opts) 

940 else: 

941 # no connection arguments whatsoever; psycopg2.connect() 

942 # requires that "dsn" be present as a blank string. 

943 return ([""], opts) 

944 

945 def is_disconnect(self, e, connection, cursor): 

946 if isinstance(e, self.dbapi.Error): 

947 # check the "closed" flag. this might not be 

948 # present on old psycopg2 versions. Also, 

949 # this flag doesn't actually help in a lot of disconnect 

950 # situations, so don't rely on it. 

951 if getattr(connection, "closed", False): 

952 return True 

953 

954 # checks based on strings. in the case that .closed 

955 # didn't cut it, fall back onto these. 

956 str_e = str(e).partition("\n")[0] 

957 for msg in [ 

958 # these error messages from libpq: interfaces/libpq/fe-misc.c 

959 # and interfaces/libpq/fe-secure.c. 

960 "terminating connection", 

961 "closed the connection", 

962 "connection not open", 

963 "could not receive data from server", 

964 "could not send data to server", 

965 # psycopg2 client errors, psycopg2/conenction.h, 

966 # psycopg2/cursor.h 

967 "connection already closed", 

968 "cursor already closed", 

969 # not sure where this path is originally from, it may 

970 # be obsolete. It really says "losed", not "closed". 

971 "losed the connection unexpectedly", 

972 # these can occur in newer SSL 

973 "connection has been closed unexpectedly", 

974 "SSL SYSCALL error: Bad file descriptor", 

975 "SSL SYSCALL error: EOF detected", 

976 "SSL error: decryption failed or bad record mac", 

977 "SSL SYSCALL error: Operation timed out", 

978 ]: 

979 idx = str_e.find(msg) 

980 if idx >= 0 and '"' not in str_e[:idx]: 

981 return True 

982 return False 

983 

984 

985dialect = PGDialect_psycopg2