Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/base.py : 25%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# postgresql/base.py
2# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
8r"""
9.. dialect:: postgresql
10 :name: PostgreSQL
12.. _postgresql_sequences:
14Sequences/SERIAL/IDENTITY
15-------------------------
17PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
18of creating new primary key values for integer-based primary key columns. When
19creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
20integer-based primary key columns, which generates a sequence and server side
21default corresponding to the column.
23To specify a specific named sequence to be used for primary key generation,
24use the :func:`~sqlalchemy.schema.Sequence` construct::
26 Table('sometable', metadata,
27 Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
28 )
30When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
31having the "last insert identifier" available, a RETURNING clause is added to
32the INSERT statement which specifies the primary key columns should be
33returned after the statement completes. The RETURNING functionality only takes
34place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
35sequence, whether specified explicitly or implicitly via ``SERIAL``, is
36executed independently beforehand, the returned value to be used in the
37subsequent insert. Note that when an
38:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
39"executemany" semantics, the "last inserted identifier" functionality does not
40apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
41case.
43To force the usage of RETURNING by default off, specify the flag
44``implicit_returning=False`` to :func:`_sa.create_engine`.
46PostgreSQL 10 IDENTITY columns
47^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
49PostgreSQL 10 has a new IDENTITY feature that supersedes the use of SERIAL.
50Built-in support for rendering of IDENTITY is not available yet, however the
51following compilation hook may be used to replace occurrences of SERIAL with
52IDENTITY::
54 from sqlalchemy.schema import CreateColumn
55 from sqlalchemy.ext.compiler import compiles
58 @compiles(CreateColumn, 'postgresql')
59 def use_identity(element, compiler, **kw):
60 text = compiler.visit_create_column(element, **kw)
61 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY")
62 return text
64Using the above, a table such as::
66 t = Table(
67 't', m,
68 Column('id', Integer, primary_key=True),
69 Column('data', String)
70 )
72Will generate on the backing database as::
74 CREATE TABLE t (
75 id INT GENERATED BY DEFAULT AS IDENTITY NOT NULL,
76 data VARCHAR,
77 PRIMARY KEY (id)
78 )
80.. _postgresql_isolation_level:
82Transaction Isolation Level
83---------------------------
85All PostgreSQL dialects support setting of transaction isolation level
86both via a dialect-specific parameter
87:paramref:`_sa.create_engine.isolation_level` accepted by
88:func:`_sa.create_engine`,
89as well as the :paramref:`.Connection.execution_options.isolation_level`
90argument as passed to :meth:`_engine.Connection.execution_options`.
91When using a non-psycopg2 dialect, this feature works by issuing the command
92``SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL <level>`` for
93each new connection. For the special AUTOCOMMIT isolation level,
94DBAPI-specific techniques are used.
96To set isolation level using :func:`_sa.create_engine`::
98 engine = create_engine(
99 "postgresql+pg8000://scott:tiger@localhost/test",
100 isolation_level="READ UNCOMMITTED"
101 )
103To set using per-connection execution options::
105 connection = engine.connect()
106 connection = connection.execution_options(
107 isolation_level="READ COMMITTED"
108 )
110Valid values for ``isolation_level`` include:
112* ``READ COMMITTED``
113* ``READ UNCOMMITTED``
114* ``REPEATABLE READ``
115* ``SERIALIZABLE``
116* ``AUTOCOMMIT`` - on psycopg2 / pg8000 only
118.. seealso::
120 :ref:`psycopg2_isolation_level`
122 :ref:`pg8000_isolation_level`
124.. _postgresql_schema_reflection:
126Remote-Schema Table Introspection and PostgreSQL search_path
127------------------------------------------------------------
129**TL;DR;**: keep the ``search_path`` variable set to its default of ``public``,
130name schemas **other** than ``public`` explicitly within ``Table`` definitions.
132The PostgreSQL dialect can reflect tables from any schema. The
133:paramref:`_schema.Table.schema` argument, or alternatively the
134:paramref:`.MetaData.reflect.schema` argument determines which schema will
135be searched for the table or tables. The reflected :class:`_schema.Table`
136objects
137will in all cases retain this ``.schema`` attribute as was specified.
138However, with regards to tables which these :class:`_schema.Table`
139objects refer to
140via foreign key constraint, a decision must be made as to how the ``.schema``
141is represented in those remote tables, in the case where that remote
142schema name is also a member of the current
143`PostgreSQL search path
144<http://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_.
146By default, the PostgreSQL dialect mimics the behavior encouraged by
147PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
148returns a sample definition for a particular foreign key constraint,
149omitting the referenced schema name from that definition when the name is
150also in the PostgreSQL schema search path. The interaction below
151illustrates this behavior::
153 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
154 CREATE TABLE
155 test=> CREATE TABLE referring(
156 test(> id INTEGER PRIMARY KEY,
157 test(> referred_id INTEGER REFERENCES test_schema.referred(id));
158 CREATE TABLE
159 test=> SET search_path TO public, test_schema;
160 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
161 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
162 test-> ON n.oid = c.relnamespace
163 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
164 test-> WHERE c.relname='referring' AND r.contype = 'f'
165 test-> ;
166 pg_get_constraintdef
167 ---------------------------------------------------
168 FOREIGN KEY (referred_id) REFERENCES referred(id)
169 (1 row)
171Above, we created a table ``referred`` as a member of the remote schema
172``test_schema``, however when we added ``test_schema`` to the
173PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
174``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
175the function.
177On the other hand, if we set the search path back to the typical default
178of ``public``::
180 test=> SET search_path TO public;
181 SET
183The same query against ``pg_get_constraintdef()`` now returns the fully
184schema-qualified name for us::
186 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
187 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
188 test-> ON n.oid = c.relnamespace
189 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
190 test-> WHERE c.relname='referring' AND r.contype = 'f';
191 pg_get_constraintdef
192 ---------------------------------------------------------------
193 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
194 (1 row)
196SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
197in order to determine the remote schema name. That is, if our ``search_path``
198were set to include ``test_schema``, and we invoked a table
199reflection process as follows::
201 >>> from sqlalchemy import Table, MetaData, create_engine
202 >>> engine = create_engine("postgresql://scott:tiger@localhost/test")
203 >>> with engine.connect() as conn:
204 ... conn.execute("SET search_path TO test_schema, public")
205 ... meta = MetaData()
206 ... referring = Table('referring', meta,
207 ... autoload=True, autoload_with=conn)
208 ...
209 <sqlalchemy.engine.result.ResultProxy object at 0x101612ed0>
211The above process would deliver to the :attr:`_schema.MetaData.tables`
212collection
213``referred`` table named **without** the schema::
215 >>> meta.tables['referred'].schema is None
216 True
218To alter the behavior of reflection such that the referred schema is
219maintained regardless of the ``search_path`` setting, use the
220``postgresql_ignore_search_path`` option, which can be specified as a
221dialect-specific argument to both :class:`_schema.Table` as well as
222:meth:`_schema.MetaData.reflect`::
224 >>> with engine.connect() as conn:
225 ... conn.execute("SET search_path TO test_schema, public")
226 ... meta = MetaData()
227 ... referring = Table('referring', meta, autoload=True,
228 ... autoload_with=conn,
229 ... postgresql_ignore_search_path=True)
230 ...
231 <sqlalchemy.engine.result.ResultProxy object at 0x1016126d0>
233We will now have ``test_schema.referred`` stored as schema-qualified::
235 >>> meta.tables['test_schema.referred'].schema
236 'test_schema'
238.. sidebar:: Best Practices for PostgreSQL Schema reflection
240 The description of PostgreSQL schema reflection behavior is complex, and
241 is the product of many years of dealing with widely varied use cases and
242 user preferences. But in fact, there's no need to understand any of it if
243 you just stick to the simplest use pattern: leave the ``search_path`` set
244 to its default of ``public`` only, never refer to the name ``public`` as
245 an explicit schema name otherwise, and refer to all other schema names
246 explicitly when building up a :class:`_schema.Table` object. The options
247 described here are only for those users who can't, or prefer not to, stay
248 within these guidelines.
250Note that **in all cases**, the "default" schema is always reflected as
251``None``. The "default" schema on PostgreSQL is that which is returned by the
252PostgreSQL ``current_schema()`` function. On a typical PostgreSQL
253installation, this is the name ``public``. So a table that refers to another
254which is in the ``public`` (i.e. default) schema will always have the
255``.schema`` attribute set to ``None``.
257.. versionadded:: 0.9.2 Added the ``postgresql_ignore_search_path``
258 dialect-level option accepted by :class:`_schema.Table` and
259 :meth:`_schema.MetaData.reflect`.
262.. seealso::
264 `The Schema Search Path
265 <http://www.postgresql.org/docs/9.0/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
266 - on the PostgreSQL website.
268INSERT/UPDATE...RETURNING
269-------------------------
271The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
272``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
273for single-row INSERT statements in order to fetch newly generated
274primary key identifiers. To specify an explicit ``RETURNING`` clause,
275use the :meth:`._UpdateBase.returning` method on a per-statement basis::
277 # INSERT..RETURNING
278 result = table.insert().returning(table.c.col1, table.c.col2).\
279 values(name='foo')
280 print(result.fetchall())
282 # UPDATE..RETURNING
283 result = table.update().returning(table.c.col1, table.c.col2).\
284 where(table.c.name=='foo').values(name='bar')
285 print(result.fetchall())
287 # DELETE..RETURNING
288 result = table.delete().returning(table.c.col1, table.c.col2).\
289 where(table.c.name=='foo')
290 print(result.fetchall())
292.. _postgresql_insert_on_conflict:
294INSERT...ON CONFLICT (Upsert)
295------------------------------
297Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
298rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
299candidate row will only be inserted if that row does not violate any unique
300constraints. In the case of a unique constraint violation, a secondary action
301can occur which can be either "DO UPDATE", indicating that the data in the
302target row should be updated, or "DO NOTHING", which indicates to silently skip
303this row.
305Conflicts are determined using existing unique constraints and indexes. These
306constraints may be identified either using their name as stated in DDL,
307or they may be *inferred* by stating the columns and conditions that comprise
308the indexes.
310SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
311:func:`_postgresql.insert()` function, which provides
312the generative methods :meth:`~.postgresql.Insert.on_conflict_do_update`
313and :meth:`~.postgresql.Insert.on_conflict_do_nothing`::
315 from sqlalchemy.dialects.postgresql import insert
317 insert_stmt = insert(my_table).values(
318 id='some_existing_id',
319 data='inserted value')
321 do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
322 index_elements=['id']
323 )
325 conn.execute(do_nothing_stmt)
327 do_update_stmt = insert_stmt.on_conflict_do_update(
328 constraint='pk_my_table',
329 set_=dict(data='updated value')
330 )
332 conn.execute(do_update_stmt)
334Both methods supply the "target" of the conflict using either the
335named constraint or by column inference:
337* The :paramref:`.Insert.on_conflict_do_update.index_elements` argument
338 specifies a sequence containing string column names, :class:`_schema.Column`
339 objects, and/or SQL expression elements, which would identify a unique
340 index::
342 do_update_stmt = insert_stmt.on_conflict_do_update(
343 index_elements=['id'],
344 set_=dict(data='updated value')
345 )
347 do_update_stmt = insert_stmt.on_conflict_do_update(
348 index_elements=[my_table.c.id],
349 set_=dict(data='updated value')
350 )
352* When using :paramref:`.Insert.on_conflict_do_update.index_elements` to
353 infer an index, a partial index can be inferred by also specifying the
354 use the :paramref:`.Insert.on_conflict_do_update.index_where` parameter::
356 from sqlalchemy.dialects.postgresql import insert
358 stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
359 stmt = stmt.on_conflict_do_update(
360 index_elements=[my_table.c.user_email],
361 index_where=my_table.c.user_email.like('%@gmail.com'),
362 set_=dict(data=stmt.excluded.data)
363 )
364 conn.execute(stmt)
366* The :paramref:`.Insert.on_conflict_do_update.constraint` argument is
367 used to specify an index directly rather than inferring it. This can be
368 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX::
370 do_update_stmt = insert_stmt.on_conflict_do_update(
371 constraint='my_table_idx_1',
372 set_=dict(data='updated value')
373 )
375 do_update_stmt = insert_stmt.on_conflict_do_update(
376 constraint='my_table_pk',
377 set_=dict(data='updated value')
378 )
380* The :paramref:`.Insert.on_conflict_do_update.constraint` argument may
381 also refer to a SQLAlchemy construct representing a constraint,
382 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
383 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
384 if the constraint has a name, it is used directly. Otherwise, if the
385 constraint is unnamed, then inference will be used, where the expressions
386 and optional WHERE clause of the constraint will be spelled out in the
387 construct. This use is especially convenient
388 to refer to the named or unnamed primary key of a :class:`_schema.Table`
389 using the
390 :attr:`_schema.Table.primary_key` attribute::
392 do_update_stmt = insert_stmt.on_conflict_do_update(
393 constraint=my_table.primary_key,
394 set_=dict(data='updated value')
395 )
397``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
398existing row, using any combination of new values as well as values
399from the proposed insertion. These values are specified using the
400:paramref:`.Insert.on_conflict_do_update.set_` parameter. This
401parameter accepts a dictionary which consists of direct values
402for UPDATE::
404 from sqlalchemy.dialects.postgresql import insert
406 stmt = insert(my_table).values(id='some_id', data='inserted value')
407 do_update_stmt = stmt.on_conflict_do_update(
408 index_elements=['id'],
409 set_=dict(data='updated value')
410 )
411 conn.execute(do_update_stmt)
413.. warning::
415 The :meth:`_expression.Insert.on_conflict_do_update`
416 method does **not** take into
417 account Python-side default UPDATE values or generation functions, e.g.
418 those specified using :paramref:`_schema.Column.onupdate`.
419 These values will not be exercised for an ON CONFLICT style of UPDATE,
420 unless they are manually specified in the
421 :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
423In order to refer to the proposed insertion row, the special alias
424:attr:`~.postgresql.Insert.excluded` is available as an attribute on
425the :class:`_postgresql.Insert` object; this object is a
426:class:`_expression.ColumnCollection`
427which alias contains all columns of the target
428table::
430 from sqlalchemy.dialects.postgresql import insert
432 stmt = insert(my_table).values(
433 id='some_id',
434 data='inserted value',
435 author='jlh')
436 do_update_stmt = stmt.on_conflict_do_update(
437 index_elements=['id'],
438 set_=dict(data='updated value', author=stmt.excluded.author)
439 )
440 conn.execute(do_update_stmt)
442The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
443a WHERE clause using the :paramref:`.Insert.on_conflict_do_update.where`
444parameter, which will limit those rows which receive an UPDATE::
446 from sqlalchemy.dialects.postgresql import insert
448 stmt = insert(my_table).values(
449 id='some_id',
450 data='inserted value',
451 author='jlh')
452 on_update_stmt = stmt.on_conflict_do_update(
453 index_elements=['id'],
454 set_=dict(data='updated value', author=stmt.excluded.author)
455 where=(my_table.c.status == 2)
456 )
457 conn.execute(on_update_stmt)
459``ON CONFLICT`` may also be used to skip inserting a row entirely
460if any conflict with a unique or exclusion constraint occurs; below
461this is illustrated using the
462:meth:`~.postgresql.Insert.on_conflict_do_nothing` method::
464 from sqlalchemy.dialects.postgresql import insert
466 stmt = insert(my_table).values(id='some_id', data='inserted value')
467 stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
468 conn.execute(stmt)
470If ``DO NOTHING`` is used without specifying any columns or constraint,
471it has the effect of skipping the INSERT for any unique or exclusion
472constraint violation which occurs::
474 from sqlalchemy.dialects.postgresql import insert
476 stmt = insert(my_table).values(id='some_id', data='inserted value')
477 stmt = stmt.on_conflict_do_nothing()
478 conn.execute(stmt)
480.. versionadded:: 1.1 Added support for PostgreSQL ON CONFLICT clauses
482.. seealso::
484 `INSERT .. ON CONFLICT
485 <http://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
486 - in the PostgreSQL documentation.
488.. _postgresql_match:
490Full Text Search
491----------------
493SQLAlchemy makes available the PostgreSQL ``@@`` operator via the
494:meth:`_expression.ColumnElement.match`
495method on any textual column expression.
496On a PostgreSQL dialect, an expression like the following::
498 select([sometable.c.text.match("search string")])
500will emit to the database::
502 SELECT text @@ to_tsquery('search string') FROM table
504The PostgreSQL text search functions such as ``to_tsquery()``
505and ``to_tsvector()`` are available
506explicitly using the standard :data:`.func` construct. For example::
508 select([
509 func.to_tsvector('fat cats ate rats').match('cat & rat')
510 ])
512Emits the equivalent of::
514 SELECT to_tsvector('fat cats ate rats') @@ to_tsquery('cat & rat')
516The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
518 from sqlalchemy.dialects.postgresql import TSVECTOR
519 from sqlalchemy import select, cast
520 select([cast("some text", TSVECTOR)])
522produces a statement equivalent to::
524 SELECT CAST('some text' AS TSVECTOR) AS anon_1
526Full Text Searches in PostgreSQL are influenced by a combination of: the
527PostgreSQL setting of ``default_text_search_config``, the ``regconfig`` used
528to build the GIN/GiST indexes, and the ``regconfig`` optionally passed in
529during a query.
531When performing a Full Text Search against a column that has a GIN or
532GiST index that is already pre-computed (which is common on full text
533searches) one may need to explicitly pass in a particular PostgreSQL
534``regconfig`` value to ensure the query-planner utilizes the index and does
535not re-compute the column on demand.
537In order to provide for this explicit query planning, or to use different
538search strategies, the ``match`` method accepts a ``postgresql_regconfig``
539keyword argument::
541 select([mytable.c.id]).where(
542 mytable.c.title.match('somestring', postgresql_regconfig='english')
543 )
545Emits the equivalent of::
547 SELECT mytable.id FROM mytable
548 WHERE mytable.title @@ to_tsquery('english', 'somestring')
550One can also specifically pass in a `'regconfig'` value to the
551``to_tsvector()`` command as the initial argument::
553 select([mytable.c.id]).where(
554 func.to_tsvector('english', mytable.c.title )\
555 .match('somestring', postgresql_regconfig='english')
556 )
558produces a statement equivalent to::
560 SELECT mytable.id FROM mytable
561 WHERE to_tsvector('english', mytable.title) @@
562 to_tsquery('english', 'somestring')
564It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
565PostgreSQL to ensure that you are generating queries with SQLAlchemy that
566take full advantage of any indexes you may have created for full text search.
568FROM ONLY ...
569-------------
571The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
572table in an inheritance hierarchy. This can be used to produce the
573``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
574syntaxes. It uses SQLAlchemy's hints mechanism::
576 # SELECT ... FROM ONLY ...
577 result = table.select().with_hint(table, 'ONLY', 'postgresql')
578 print(result.fetchall())
580 # UPDATE ONLY ...
581 table.update(values=dict(foo='bar')).with_hint('ONLY',
582 dialect_name='postgresql')
584 # DELETE FROM ONLY ...
585 table.delete().with_hint('ONLY', dialect_name='postgresql')
588.. _postgresql_indexes:
590PostgreSQL-Specific Index Options
591---------------------------------
593Several extensions to the :class:`.Index` construct are available, specific
594to the PostgreSQL dialect.
596.. _postgresql_partial_indexes:
598Partial Indexes
599^^^^^^^^^^^^^^^
601Partial indexes add criterion to the index definition so that the index is
602applied to a subset of rows. These can be specified on :class:`.Index`
603using the ``postgresql_where`` keyword argument::
605 Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10)
607Operator Classes
608^^^^^^^^^^^^^^^^
610PostgreSQL allows the specification of an *operator class* for each column of
611an index (see
612http://www.postgresql.org/docs/8.3/interactive/indexes-opclass.html).
613The :class:`.Index` construct allows these to be specified via the
614``postgresql_ops`` keyword argument::
616 Index(
617 'my_index', my_table.c.id, my_table.c.data,
618 postgresql_ops={
619 'data': 'text_pattern_ops',
620 'id': 'int4_ops'
621 })
623Note that the keys in the ``postgresql_ops`` dictionary are the "key" name of
624the :class:`_schema.Column`, i.e. the name used to access it from the ``.c``
625collection of :class:`_schema.Table`,
626which can be configured to be different than
627the actual name of the column as expressed in the database.
629If ``postgresql_ops`` is to be used against a complex SQL expression such
630as a function call, then to apply to the column it must be given a label
631that is identified in the dictionary by name, e.g.::
633 Index(
634 'my_index', my_table.c.id,
635 func.lower(my_table.c.data).label('data_lower'),
636 postgresql_ops={
637 'data_lower': 'text_pattern_ops',
638 'id': 'int4_ops'
639 })
642Index Types
643^^^^^^^^^^^
645PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
646as the ability for users to create their own (see
647http://www.postgresql.org/docs/8.3/static/indexes-types.html). These can be
648specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
650 Index('my_index', my_table.c.data, postgresql_using='gin')
652The value passed to the keyword argument will be simply passed through to the
653underlying CREATE INDEX command, so it *must* be a valid index type for your
654version of PostgreSQL.
656.. _postgresql_index_storage:
658Index Storage Parameters
659^^^^^^^^^^^^^^^^^^^^^^^^
661PostgreSQL allows storage parameters to be set on indexes. The storage
662parameters available depend on the index method used by the index. Storage
663parameters can be specified on :class:`.Index` using the ``postgresql_with``
664keyword argument::
666 Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50})
668.. versionadded:: 1.0.6
670PostgreSQL allows to define the tablespace in which to create the index.
671The tablespace can be specified on :class:`.Index` using the
672``postgresql_tablespace`` keyword argument::
674 Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace')
676.. versionadded:: 1.1
678Note that the same option is available on :class:`_schema.Table` as well.
680.. _postgresql_index_concurrently:
682Indexes with CONCURRENTLY
683^^^^^^^^^^^^^^^^^^^^^^^^^
685The PostgreSQL index option CONCURRENTLY is supported by passing the
686flag ``postgresql_concurrently`` to the :class:`.Index` construct::
688 tbl = Table('testtbl', m, Column('data', Integer))
690 idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True)
692The above index construct will render DDL for CREATE INDEX, assuming
693PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as::
695 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
697For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
698a connection-less dialect, it will emit::
700 DROP INDEX CONCURRENTLY test_idx1
702.. versionadded:: 1.1 support for CONCURRENTLY on DROP INDEX. The
703 CONCURRENTLY keyword is now only emitted if a high enough version
704 of PostgreSQL is detected on the connection (or for a connection-less
705 dialect).
707When using CONCURRENTLY, the PostgreSQL database requires that the statement
708be invoked outside of a transaction block. The Python DBAPI enforces that
709even for a single statement, a transaction is present, so to use this
710construct, the DBAPI's "autocommit" mode must be used::
712 metadata = MetaData()
713 table = Table(
714 "foo", metadata,
715 Column("id", String))
716 index = Index(
717 "foo_idx", table.c.id, postgresql_concurrently=True)
719 with engine.connect() as conn:
720 with conn.execution_options(isolation_level='AUTOCOMMIT'):
721 table.create(conn)
723.. seealso::
725 :ref:`postgresql_isolation_level`
727.. _postgresql_index_reflection:
729PostgreSQL Index Reflection
730---------------------------
732The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
733UNIQUE CONSTRAINT construct is used. When inspecting a table using
734:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
735and the :meth:`_reflection.Inspector.get_unique_constraints`
736will report on these
737two constructs distinctly; in the case of the index, the key
738``duplicates_constraint`` will be present in the index entry if it is
739detected as mirroring a constraint. When performing reflection using
740``Table(..., autoload=True)``, the UNIQUE INDEX is **not** returned
741in :attr:`_schema.Table.indexes` when it is detected as mirroring a
742:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
743.
745.. versionchanged:: 1.0.0 - :class:`_schema.Table` reflection now includes
746 :class:`.UniqueConstraint` objects present in the
747 :attr:`_schema.Table.constraints`
748 collection; the PostgreSQL backend will no longer include a "mirrored"
749 :class:`.Index` construct in :attr:`_schema.Table.indexes`
750 if it is detected
751 as corresponding to a unique constraint.
753Special Reflection Options
754--------------------------
756The :class:`_reflection.Inspector`
757used for the PostgreSQL backend is an instance
758of :class:`.PGInspector`, which offers additional methods::
760 from sqlalchemy import create_engine, inspect
762 engine = create_engine("postgresql+psycopg2://localhost/test")
763 insp = inspect(engine) # will be a PGInspector
765 print(insp.get_enums())
767.. autoclass:: PGInspector
768 :members:
770.. _postgresql_table_options:
772PostgreSQL Table Options
773------------------------
775Several options for CREATE TABLE are supported directly by the PostgreSQL
776dialect in conjunction with the :class:`_schema.Table` construct:
778* ``TABLESPACE``::
780 Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace')
782 The above option is also available on the :class:`.Index` construct.
784* ``ON COMMIT``::
786 Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS')
788* ``WITH OIDS``::
790 Table("some_table", metadata, ..., postgresql_with_oids=True)
792* ``WITHOUT OIDS``::
794 Table("some_table", metadata, ..., postgresql_with_oids=False)
796* ``INHERITS``::
798 Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
800 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
802 .. versionadded:: 1.0.0
804* ``PARTITION BY``::
806 Table("some_table", metadata, ...,
807 postgresql_partition_by='LIST (part_column)')
809 .. versionadded:: 1.2.6
811.. seealso::
813 `PostgreSQL CREATE TABLE options
814 <http://www.postgresql.org/docs/current/static/sql-createtable.html>`_
816Table values, Row and Tuple objects
817-----------------------------------
819Row Types
820^^^^^^^^^
822Built-in support for rendering a ``ROW`` is not available yet, however the
823:func:`_expression.tuple_` may be used in its place. Another alternative is
824to use the :attr:`_sa.func` generator with ``func.ROW`` ::
826 table.select().where(
827 tuple_(table.c.id, table.c.fk) > (1,2)
828 ).where(func.ROW(table.c.id, table.c.fk) < func.ROW(3, 7))
830Will generate the row-wise comparison::
832 SELECT *
833 FROM table
834 WHERE (id, fk) > (1, 2)
835 AND ROW(id, fk) < ROW(3, 7)
837.. seealso::
839 `PostgreSQL Row Constructors
840 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
842 `PostgreSQL Row Constructor Comparison
843 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
845Table Types
846^^^^^^^^^^^
848PostgreSQL also supports passing a table as an argument to a function. This
849is not available yet in sqlalchemy, however the
850:func:`_expression.literal_column` function with the name of the table may be
851used in its place::
853 select(['*']).select_from(func.my_function(literal_column('my_table')))
855Will generate the SQL::
857 SELECT *
858 FROM my_function(my_table)
860ARRAY Types
861-----------
863The PostgreSQL dialect supports arrays, both as multidimensional column types
864as well as array literals:
866* :class:`_postgresql.ARRAY` - ARRAY datatype
868* :class:`_postgresql.array` - array literal
870* :func:`_postgresql.array_agg` - ARRAY_AGG SQL function
872* :class:`_postgresql.aggregate_order_by` - helper for PG's ORDER BY aggregate
873 function syntax.
875JSON Types
876----------
878The PostgreSQL dialect supports both JSON and JSONB datatypes, including
879psycopg2's native support and support for all of PostgreSQL's special
880operators:
882* :class:`_postgresql.JSON`
884* :class:`_postgresql.JSONB`
886HSTORE Type
887-----------
889The PostgreSQL HSTORE type as well as hstore literals are supported:
891* :class:`_postgresql.HSTORE` - HSTORE datatype
893* :class:`_postgresql.hstore` - hstore literal
895ENUM Types
896----------
898PostgreSQL has an independently creatable TYPE structure which is used
899to implement an enumerated type. This approach introduces significant
900complexity on the SQLAlchemy side in terms of when this type should be
901CREATED and DROPPED. The type object is also an independently reflectable
902entity. The following sections should be consulted:
904* :class:`_postgresql.ENUM` - DDL and typing support for ENUM.
906* :meth:`.PGInspector.get_enums` - retrieve a listing of current ENUM types
908* :meth:`.postgresql.ENUM.create` , :meth:`.postgresql.ENUM.drop` - individual
909 CREATE and DROP commands for ENUM.
911.. _postgresql_array_of_enum:
913Using ENUM with ARRAY
914^^^^^^^^^^^^^^^^^^^^^
916The combination of ENUM and ARRAY is not directly supported by backend
917DBAPIs at this time. Prior to SQLAlchemy 1.3.17, a special workaround
918was needed in order to allow this combination to work, described below.
920.. versionchanged:: 1.3.17 The combination of ENUM and ARRAY is now directly
921 handled by SQLAlchemy's implementation without any workarounds needed.
923.. sourcecode:: python
925 from sqlalchemy import TypeDecorator
926 from sqlalchemy.dialects.postgresql import ARRAY
928 class ArrayOfEnum(TypeDecorator):
929 impl = ARRAY
931 def bind_expression(self, bindvalue):
932 return sa.cast(bindvalue, self)
934 def result_processor(self, dialect, coltype):
935 super_rp = super(ArrayOfEnum, self).result_processor(
936 dialect, coltype)
938 def handle_raw_string(value):
939 inner = re.match(r"^{(.*)}$", value).group(1)
940 return inner.split(",") if inner else []
942 def process(value):
943 if value is None:
944 return None
945 return super_rp(handle_raw_string(value))
946 return process
948E.g.::
950 Table(
951 'mydata', metadata,
952 Column('id', Integer, primary_key=True),
953 Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum')))
955 )
957This type is not included as a built-in type as it would be incompatible
958with a DBAPI that suddenly decides to support ARRAY of ENUM directly in
959a new version.
961.. _postgresql_array_of_json:
963Using JSON/JSONB with ARRAY
964^^^^^^^^^^^^^^^^^^^^^^^^^^^
966Similar to using ENUM, prior to SQLAlchemy 1.3.17, for an ARRAY of JSON/JSONB
967we need to render the appropriate CAST. Current psycopg2 drivers accomodate
968the result set correctly without any special steps.
970.. versionchanged:: 1.3.17 The combination of JSON/JSONB and ARRAY is now
971 directly handled by SQLAlchemy's implementation without any workarounds
972 needed.
974.. sourcecode:: python
976 class CastingArray(ARRAY):
977 def bind_expression(self, bindvalue):
978 return sa.cast(bindvalue, self)
980E.g.::
982 Table(
983 'mydata', metadata,
984 Column('id', Integer, primary_key=True),
985 Column('data', CastingArray(JSONB))
986 )
989"""
990from collections import defaultdict
991import datetime as dt
992import re
994from . import array as _array
995from . import hstore as _hstore
996from . import json as _json
997from . import ranges as _ranges
998from ... import exc
999from ... import schema
1000from ... import sql
1001from ... import util
1002from ...engine import default
1003from ...engine import reflection
1004from ...sql import compiler
1005from ...sql import elements
1006from ...sql import expression
1007from ...sql import sqltypes
1008from ...sql import util as sql_util
1009from ...types import BIGINT
1010from ...types import BOOLEAN
1011from ...types import CHAR
1012from ...types import DATE
1013from ...types import FLOAT
1014from ...types import INTEGER
1015from ...types import NUMERIC
1016from ...types import REAL
1017from ...types import SMALLINT
1018from ...types import TEXT
1019from ...types import VARCHAR
1022try:
1023 from uuid import UUID as _python_UUID # noqa
1024except ImportError:
1025 _python_UUID = None
1028IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
1030AUTOCOMMIT_REGEXP = re.compile(
1031 r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|"
1032 "IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW|TRUNCATE)",
1033 re.I | re.UNICODE,
1034)
1036RESERVED_WORDS = set(
1037 [
1038 "all",
1039 "analyse",
1040 "analyze",
1041 "and",
1042 "any",
1043 "array",
1044 "as",
1045 "asc",
1046 "asymmetric",
1047 "both",
1048 "case",
1049 "cast",
1050 "check",
1051 "collate",
1052 "column",
1053 "constraint",
1054 "create",
1055 "current_catalog",
1056 "current_date",
1057 "current_role",
1058 "current_time",
1059 "current_timestamp",
1060 "current_user",
1061 "default",
1062 "deferrable",
1063 "desc",
1064 "distinct",
1065 "do",
1066 "else",
1067 "end",
1068 "except",
1069 "false",
1070 "fetch",
1071 "for",
1072 "foreign",
1073 "from",
1074 "grant",
1075 "group",
1076 "having",
1077 "in",
1078 "initially",
1079 "intersect",
1080 "into",
1081 "leading",
1082 "limit",
1083 "localtime",
1084 "localtimestamp",
1085 "new",
1086 "not",
1087 "null",
1088 "of",
1089 "off",
1090 "offset",
1091 "old",
1092 "on",
1093 "only",
1094 "or",
1095 "order",
1096 "placing",
1097 "primary",
1098 "references",
1099 "returning",
1100 "select",
1101 "session_user",
1102 "some",
1103 "symmetric",
1104 "table",
1105 "then",
1106 "to",
1107 "trailing",
1108 "true",
1109 "union",
1110 "unique",
1111 "user",
1112 "using",
1113 "variadic",
1114 "when",
1115 "where",
1116 "window",
1117 "with",
1118 "authorization",
1119 "between",
1120 "binary",
1121 "cross",
1122 "current_schema",
1123 "freeze",
1124 "full",
1125 "ilike",
1126 "inner",
1127 "is",
1128 "isnull",
1129 "join",
1130 "left",
1131 "like",
1132 "natural",
1133 "notnull",
1134 "outer",
1135 "over",
1136 "overlaps",
1137 "right",
1138 "similar",
1139 "verbose",
1140 ]
1141)
1143_DECIMAL_TYPES = (1231, 1700)
1144_FLOAT_TYPES = (700, 701, 1021, 1022)
1145_INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
1148class BYTEA(sqltypes.LargeBinary):
1149 __visit_name__ = "BYTEA"
1152class DOUBLE_PRECISION(sqltypes.Float):
1153 __visit_name__ = "DOUBLE_PRECISION"
1156class INET(sqltypes.TypeEngine):
1157 __visit_name__ = "INET"
1160PGInet = INET
1163class CIDR(sqltypes.TypeEngine):
1164 __visit_name__ = "CIDR"
1167PGCidr = CIDR
1170class MACADDR(sqltypes.TypeEngine):
1171 __visit_name__ = "MACADDR"
1174PGMacAddr = MACADDR
1177class MONEY(sqltypes.TypeEngine):
1179 """Provide the PostgreSQL MONEY type.
1181 .. versionadded:: 1.2
1183 """
1185 __visit_name__ = "MONEY"
1188class OID(sqltypes.TypeEngine):
1190 """Provide the PostgreSQL OID type.
1192 .. versionadded:: 0.9.5
1194 """
1196 __visit_name__ = "OID"
1199class REGCLASS(sqltypes.TypeEngine):
1201 """Provide the PostgreSQL REGCLASS type.
1203 .. versionadded:: 1.2.7
1205 """
1207 __visit_name__ = "REGCLASS"
1210class TIMESTAMP(sqltypes.TIMESTAMP):
1211 def __init__(self, timezone=False, precision=None):
1212 super(TIMESTAMP, self).__init__(timezone=timezone)
1213 self.precision = precision
1216class TIME(sqltypes.TIME):
1217 def __init__(self, timezone=False, precision=None):
1218 super(TIME, self).__init__(timezone=timezone)
1219 self.precision = precision
1222class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
1224 """PostgreSQL INTERVAL type.
1226 The INTERVAL type may not be supported on all DBAPIs.
1227 It is known to work on psycopg2 and not pg8000 or zxjdbc.
1229 """
1231 __visit_name__ = "INTERVAL"
1232 native = True
1234 def __init__(self, precision=None, fields=None):
1235 """Construct an INTERVAL.
1237 :param precision: optional integer precision value
1238 :param fields: string fields specifier. allows storage of fields
1239 to be limited, such as ``"YEAR"``, ``"MONTH"``, ``"DAY TO HOUR"``,
1240 etc.
1242 .. versionadded:: 1.2
1244 """
1245 self.precision = precision
1246 self.fields = fields
1248 @classmethod
1249 def adapt_emulated_to_native(cls, interval, **kw):
1250 return INTERVAL(precision=interval.second_precision)
1252 @property
1253 def _type_affinity(self):
1254 return sqltypes.Interval
1256 @property
1257 def python_type(self):
1258 return dt.timedelta
1261PGInterval = INTERVAL
1264class BIT(sqltypes.TypeEngine):
1265 __visit_name__ = "BIT"
1267 def __init__(self, length=None, varying=False):
1268 if not varying:
1269 # BIT without VARYING defaults to length 1
1270 self.length = length or 1
1271 else:
1272 # but BIT VARYING can be unlimited-length, so no default
1273 self.length = length
1274 self.varying = varying
1277PGBit = BIT
1280class UUID(sqltypes.TypeEngine):
1282 """PostgreSQL UUID type.
1284 Represents the UUID column type, interpreting
1285 data either as natively returned by the DBAPI
1286 or as Python uuid objects.
1288 The UUID type may not be supported on all DBAPIs.
1289 It is known to work on psycopg2 and not pg8000.
1291 """
1293 __visit_name__ = "UUID"
1295 def __init__(self, as_uuid=False):
1296 """Construct a UUID type.
1299 :param as_uuid=False: if True, values will be interpreted
1300 as Python uuid objects, converting to/from string via the
1301 DBAPI.
1303 """
1304 if as_uuid and _python_UUID is None:
1305 raise NotImplementedError(
1306 "This version of Python does not support "
1307 "the native UUID type."
1308 )
1309 self.as_uuid = as_uuid
1311 def bind_processor(self, dialect):
1312 if self.as_uuid:
1314 def process(value):
1315 if value is not None:
1316 value = util.text_type(value)
1317 return value
1319 return process
1320 else:
1321 return None
1323 def result_processor(self, dialect, coltype):
1324 if self.as_uuid:
1326 def process(value):
1327 if value is not None:
1328 value = _python_UUID(value)
1329 return value
1331 return process
1332 else:
1333 return None
1336PGUuid = UUID
1339class TSVECTOR(sqltypes.TypeEngine):
1341 """The :class:`_postgresql.TSVECTOR` type implements the PostgreSQL
1342 text search type TSVECTOR.
1344 It can be used to do full text queries on natural language
1345 documents.
1347 .. versionadded:: 0.9.0
1349 .. seealso::
1351 :ref:`postgresql_match`
1353 """
1355 __visit_name__ = "TSVECTOR"
1358class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum):
1360 """PostgreSQL ENUM type.
1362 This is a subclass of :class:`_types.Enum` which includes
1363 support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
1365 When the builtin type :class:`_types.Enum` is used and the
1366 :paramref:`.Enum.native_enum` flag is left at its default of
1367 True, the PostgreSQL backend will use a :class:`_postgresql.ENUM`
1368 type as the implementation, so the special create/drop rules
1369 will be used.
1371 The create/drop behavior of ENUM is necessarily intricate, due to the
1372 awkward relationship the ENUM type has in relationship to the
1373 parent table, in that it may be "owned" by just a single table, or
1374 may be shared among many tables.
1376 When using :class:`_types.Enum` or :class:`_postgresql.ENUM`
1377 in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
1378 corresponding to when the :meth:`_schema.Table.create` and
1379 :meth:`_schema.Table.drop`
1380 methods are called::
1382 table = Table('sometable', metadata,
1383 Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
1384 )
1386 table.create(engine) # will emit CREATE ENUM and CREATE TABLE
1387 table.drop(engine) # will emit DROP TABLE and DROP ENUM
1389 To use a common enumerated type between multiple tables, the best
1390 practice is to declare the :class:`_types.Enum` or
1391 :class:`_postgresql.ENUM` independently, and associate it with the
1392 :class:`_schema.MetaData` object itself::
1394 my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
1396 t1 = Table('sometable_one', metadata,
1397 Column('some_enum', myenum)
1398 )
1400 t2 = Table('sometable_two', metadata,
1401 Column('some_enum', myenum)
1402 )
1404 When this pattern is used, care must still be taken at the level
1405 of individual table creates. Emitting CREATE TABLE without also
1406 specifying ``checkfirst=True`` will still cause issues::
1408 t1.create(engine) # will fail: no such type 'myenum'
1410 If we specify ``checkfirst=True``, the individual table-level create
1411 operation will check for the ``ENUM`` and create if not exists::
1413 # will check if enum exists, and emit CREATE TYPE if not
1414 t1.create(engine, checkfirst=True)
1416 When using a metadata-level ENUM type, the type will always be created
1417 and dropped if either the metadata-wide create/drop is called::
1419 metadata.create_all(engine) # will emit CREATE TYPE
1420 metadata.drop_all(engine) # will emit DROP TYPE
1422 The type can also be created and dropped directly::
1424 my_enum.create(engine)
1425 my_enum.drop(engine)
1427 .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type
1428 now behaves more strictly with regards to CREATE/DROP. A metadata-level
1429 ENUM type will only be created and dropped at the metadata level,
1430 not the table level, with the exception of
1431 ``table.create(checkfirst=True)``.
1432 The ``table.drop()`` call will now emit a DROP TYPE for a table-level
1433 enumerated type.
1435 """
1437 native_enum = True
1439 def __init__(self, *enums, **kw):
1440 """Construct an :class:`_postgresql.ENUM`.
1442 Arguments are the same as that of
1443 :class:`_types.Enum`, but also including
1444 the following parameters.
1446 :param create_type: Defaults to True.
1447 Indicates that ``CREATE TYPE`` should be
1448 emitted, after optionally checking for the
1449 presence of the type, when the parent
1450 table is being created; and additionally
1451 that ``DROP TYPE`` is called when the table
1452 is dropped. When ``False``, no check
1453 will be performed and no ``CREATE TYPE``
1454 or ``DROP TYPE`` is emitted, unless
1455 :meth:`~.postgresql.ENUM.create`
1456 or :meth:`~.postgresql.ENUM.drop`
1457 are called directly.
1458 Setting to ``False`` is helpful
1459 when invoking a creation scheme to a SQL file
1460 without access to the actual database -
1461 the :meth:`~.postgresql.ENUM.create` and
1462 :meth:`~.postgresql.ENUM.drop` methods can
1463 be used to emit SQL to a target bind.
1465 """
1466 self.create_type = kw.pop("create_type", True)
1467 super(ENUM, self).__init__(*enums, **kw)
1469 @classmethod
1470 def adapt_emulated_to_native(cls, impl, **kw):
1471 """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain
1472 :class:`.Enum`.
1474 """
1475 kw.setdefault("validate_strings", impl.validate_strings)
1476 kw.setdefault("name", impl.name)
1477 kw.setdefault("schema", impl.schema)
1478 kw.setdefault("inherit_schema", impl.inherit_schema)
1479 kw.setdefault("metadata", impl.metadata)
1480 kw.setdefault("_create_events", False)
1481 kw.setdefault("values_callable", impl.values_callable)
1482 return cls(**kw)
1484 def create(self, bind=None, checkfirst=True):
1485 """Emit ``CREATE TYPE`` for this
1486 :class:`_postgresql.ENUM`.
1488 If the underlying dialect does not support
1489 PostgreSQL CREATE TYPE, no action is taken.
1491 :param bind: a connectable :class:`_engine.Engine`,
1492 :class:`_engine.Connection`, or similar object to emit
1493 SQL.
1494 :param checkfirst: if ``True``, a query against
1495 the PG catalog will be first performed to see
1496 if the type does not exist already before
1497 creating.
1499 """
1500 if not bind.dialect.supports_native_enum:
1501 return
1503 if not checkfirst or not bind.dialect.has_type(
1504 bind, self.name, schema=self.schema
1505 ):
1506 bind.execute(CreateEnumType(self))
1508 def drop(self, bind=None, checkfirst=True):
1509 """Emit ``DROP TYPE`` for this
1510 :class:`_postgresql.ENUM`.
1512 If the underlying dialect does not support
1513 PostgreSQL DROP TYPE, no action is taken.
1515 :param bind: a connectable :class:`_engine.Engine`,
1516 :class:`_engine.Connection`, or similar object to emit
1517 SQL.
1518 :param checkfirst: if ``True``, a query against
1519 the PG catalog will be first performed to see
1520 if the type actually exists before dropping.
1522 """
1523 if not bind.dialect.supports_native_enum:
1524 return
1526 if not checkfirst or bind.dialect.has_type(
1527 bind, self.name, schema=self.schema
1528 ):
1529 bind.execute(DropEnumType(self))
1531 def _check_for_name_in_memos(self, checkfirst, kw):
1532 """Look in the 'ddl runner' for 'memos', then
1533 note our name in that collection.
1535 This to ensure a particular named enum is operated
1536 upon only once within any kind of create/drop
1537 sequence without relying upon "checkfirst".
1539 """
1540 if not self.create_type:
1541 return True
1542 if "_ddl_runner" in kw:
1543 ddl_runner = kw["_ddl_runner"]
1544 if "_pg_enums" in ddl_runner.memo:
1545 pg_enums = ddl_runner.memo["_pg_enums"]
1546 else:
1547 pg_enums = ddl_runner.memo["_pg_enums"] = set()
1548 present = (self.schema, self.name) in pg_enums
1549 pg_enums.add((self.schema, self.name))
1550 return present
1551 else:
1552 return False
1554 def _on_table_create(self, target, bind, checkfirst=False, **kw):
1555 if (
1556 checkfirst
1557 or (
1558 not self.metadata
1559 and not kw.get("_is_metadata_operation", False)
1560 )
1561 and not self._check_for_name_in_memos(checkfirst, kw)
1562 ):
1563 self.create(bind=bind, checkfirst=checkfirst)
1565 def _on_table_drop(self, target, bind, checkfirst=False, **kw):
1566 if (
1567 not self.metadata
1568 and not kw.get("_is_metadata_operation", False)
1569 and not self._check_for_name_in_memos(checkfirst, kw)
1570 ):
1571 self.drop(bind=bind, checkfirst=checkfirst)
1573 def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
1574 if not self._check_for_name_in_memos(checkfirst, kw):
1575 self.create(bind=bind, checkfirst=checkfirst)
1577 def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
1578 if not self._check_for_name_in_memos(checkfirst, kw):
1579 self.drop(bind=bind, checkfirst=checkfirst)
1582colspecs = {
1583 sqltypes.ARRAY: _array.ARRAY,
1584 sqltypes.Interval: INTERVAL,
1585 sqltypes.Enum: ENUM,
1586 sqltypes.JSON.JSONPathType: _json.JSONPathType,
1587 sqltypes.JSON: _json.JSON,
1588}
1590ischema_names = {
1591 "_array": _array.ARRAY,
1592 "hstore": _hstore.HSTORE,
1593 "json": _json.JSON,
1594 "jsonb": _json.JSONB,
1595 "int4range": _ranges.INT4RANGE,
1596 "int8range": _ranges.INT8RANGE,
1597 "numrange": _ranges.NUMRANGE,
1598 "daterange": _ranges.DATERANGE,
1599 "tsrange": _ranges.TSRANGE,
1600 "tstzrange": _ranges.TSTZRANGE,
1601 "integer": INTEGER,
1602 "bigint": BIGINT,
1603 "smallint": SMALLINT,
1604 "character varying": VARCHAR,
1605 "character": CHAR,
1606 '"char"': sqltypes.String,
1607 "name": sqltypes.String,
1608 "text": TEXT,
1609 "numeric": NUMERIC,
1610 "float": FLOAT,
1611 "real": REAL,
1612 "inet": INET,
1613 "cidr": CIDR,
1614 "uuid": UUID,
1615 "bit": BIT,
1616 "bit varying": BIT,
1617 "macaddr": MACADDR,
1618 "money": MONEY,
1619 "oid": OID,
1620 "regclass": REGCLASS,
1621 "double precision": DOUBLE_PRECISION,
1622 "timestamp": TIMESTAMP,
1623 "timestamp with time zone": TIMESTAMP,
1624 "timestamp without time zone": TIMESTAMP,
1625 "time with time zone": TIME,
1626 "time without time zone": TIME,
1627 "date": DATE,
1628 "time": TIME,
1629 "bytea": BYTEA,
1630 "boolean": BOOLEAN,
1631 "interval": INTERVAL,
1632 "tsvector": TSVECTOR,
1633}
1636class PGCompiler(compiler.SQLCompiler):
1637 def visit_array(self, element, **kw):
1638 return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
1640 def visit_slice(self, element, **kw):
1641 return "%s:%s" % (
1642 self.process(element.start, **kw),
1643 self.process(element.stop, **kw),
1644 )
1646 def visit_json_getitem_op_binary(
1647 self, binary, operator, _cast_applied=False, **kw
1648 ):
1649 if (
1650 not _cast_applied
1651 and binary.type._type_affinity is not sqltypes.JSON
1652 ):
1653 kw["_cast_applied"] = True
1654 return self.process(sql.cast(binary, binary.type), **kw)
1656 kw["eager_grouping"] = True
1658 return self._generate_generic_binary(
1659 binary, " -> " if not _cast_applied else " ->> ", **kw
1660 )
1662 def visit_json_path_getitem_op_binary(
1663 self, binary, operator, _cast_applied=False, **kw
1664 ):
1665 if (
1666 not _cast_applied
1667 and binary.type._type_affinity is not sqltypes.JSON
1668 ):
1669 kw["_cast_applied"] = True
1670 return self.process(sql.cast(binary, binary.type), **kw)
1672 kw["eager_grouping"] = True
1673 return self._generate_generic_binary(
1674 binary, " #> " if not _cast_applied else " #>> ", **kw
1675 )
1677 def visit_getitem_binary(self, binary, operator, **kw):
1678 return "%s[%s]" % (
1679 self.process(binary.left, **kw),
1680 self.process(binary.right, **kw),
1681 )
1683 def visit_aggregate_order_by(self, element, **kw):
1684 return "%s ORDER BY %s" % (
1685 self.process(element.target, **kw),
1686 self.process(element.order_by, **kw),
1687 )
1689 def visit_match_op_binary(self, binary, operator, **kw):
1690 if "postgresql_regconfig" in binary.modifiers:
1691 regconfig = self.render_literal_value(
1692 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
1693 )
1694 if regconfig:
1695 return "%s @@ to_tsquery(%s, %s)" % (
1696 self.process(binary.left, **kw),
1697 regconfig,
1698 self.process(binary.right, **kw),
1699 )
1700 return "%s @@ to_tsquery(%s)" % (
1701 self.process(binary.left, **kw),
1702 self.process(binary.right, **kw),
1703 )
1705 def visit_ilike_op_binary(self, binary, operator, **kw):
1706 escape = binary.modifiers.get("escape", None)
1708 return "%s ILIKE %s" % (
1709 self.process(binary.left, **kw),
1710 self.process(binary.right, **kw),
1711 ) + (
1712 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
1713 if escape
1714 else ""
1715 )
1717 def visit_notilike_op_binary(self, binary, operator, **kw):
1718 escape = binary.modifiers.get("escape", None)
1719 return "%s NOT ILIKE %s" % (
1720 self.process(binary.left, **kw),
1721 self.process(binary.right, **kw),
1722 ) + (
1723 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
1724 if escape
1725 else ""
1726 )
1728 def visit_empty_set_expr(self, element_types):
1729 # cast the empty set to the type we are comparing against. if
1730 # we are comparing against the null type, pick an arbitrary
1731 # datatype for the empty set
1732 return "SELECT %s WHERE 1!=1" % (
1733 ", ".join(
1734 "CAST(NULL AS %s)"
1735 % self.dialect.type_compiler.process(
1736 INTEGER() if type_._isnull else type_
1737 )
1738 for type_ in element_types or [INTEGER()]
1739 ),
1740 )
1742 def render_literal_value(self, value, type_):
1743 value = super(PGCompiler, self).render_literal_value(value, type_)
1745 if self.dialect._backslash_escapes:
1746 value = value.replace("\\", "\\\\")
1747 return value
1749 def visit_sequence(self, seq, **kw):
1750 return "nextval('%s')" % self.preparer.format_sequence(seq)
1752 def limit_clause(self, select, **kw):
1753 text = ""
1754 if select._limit_clause is not None:
1755 text += " \n LIMIT " + self.process(select._limit_clause, **kw)
1756 if select._offset_clause is not None:
1757 if select._limit_clause is None:
1758 text += " \n LIMIT ALL"
1759 text += " OFFSET " + self.process(select._offset_clause, **kw)
1760 return text
1762 def format_from_hint_text(self, sqltext, table, hint, iscrud):
1763 if hint.upper() != "ONLY":
1764 raise exc.CompileError("Unrecognized hint: %r" % hint)
1765 return "ONLY " + sqltext
1767 def get_select_precolumns(self, select, **kw):
1768 if select._distinct is not False:
1769 if select._distinct is True:
1770 return "DISTINCT "
1771 elif isinstance(select._distinct, (list, tuple)):
1772 return (
1773 "DISTINCT ON ("
1774 + ", ".join(
1775 [self.process(col, **kw) for col in select._distinct]
1776 )
1777 + ") "
1778 )
1779 else:
1780 return (
1781 "DISTINCT ON ("
1782 + self.process(select._distinct, **kw)
1783 + ") "
1784 )
1785 else:
1786 return ""
1788 def for_update_clause(self, select, **kw):
1790 if select._for_update_arg.read:
1791 if select._for_update_arg.key_share:
1792 tmp = " FOR KEY SHARE"
1793 else:
1794 tmp = " FOR SHARE"
1795 elif select._for_update_arg.key_share:
1796 tmp = " FOR NO KEY UPDATE"
1797 else:
1798 tmp = " FOR UPDATE"
1800 if select._for_update_arg.of:
1802 tables = util.OrderedSet()
1803 for c in select._for_update_arg.of:
1804 tables.update(sql_util.surface_selectables_only(c))
1806 tmp += " OF " + ", ".join(
1807 self.process(table, ashint=True, use_schema=False, **kw)
1808 for table in tables
1809 )
1811 if select._for_update_arg.nowait:
1812 tmp += " NOWAIT"
1813 if select._for_update_arg.skip_locked:
1814 tmp += " SKIP LOCKED"
1816 return tmp
1818 def returning_clause(self, stmt, returning_cols):
1820 columns = [
1821 self._label_select_column(None, c, True, False, {})
1822 for c in expression._select_iterables(returning_cols)
1823 ]
1825 return "RETURNING " + ", ".join(columns)
1827 def visit_substring_func(self, func, **kw):
1828 s = self.process(func.clauses.clauses[0], **kw)
1829 start = self.process(func.clauses.clauses[1], **kw)
1830 if len(func.clauses.clauses) > 2:
1831 length = self.process(func.clauses.clauses[2], **kw)
1832 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
1833 else:
1834 return "SUBSTRING(%s FROM %s)" % (s, start)
1836 def _on_conflict_target(self, clause, **kw):
1838 if clause.constraint_target is not None:
1839 target_text = "ON CONSTRAINT %s" % clause.constraint_target
1840 elif clause.inferred_target_elements is not None:
1841 target_text = "(%s)" % ", ".join(
1842 (
1843 self.preparer.quote(c)
1844 if isinstance(c, util.string_types)
1845 else self.process(c, include_table=False, use_schema=False)
1846 )
1847 for c in clause.inferred_target_elements
1848 )
1849 if clause.inferred_target_whereclause is not None:
1850 target_text += " WHERE %s" % self.process(
1851 clause.inferred_target_whereclause,
1852 include_table=False,
1853 use_schema=False,
1854 )
1855 else:
1856 target_text = ""
1858 return target_text
1860 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
1862 target_text = self._on_conflict_target(on_conflict, **kw)
1864 if target_text:
1865 return "ON CONFLICT %s DO NOTHING" % target_text
1866 else:
1867 return "ON CONFLICT DO NOTHING"
1869 def visit_on_conflict_do_update(self, on_conflict, **kw):
1871 clause = on_conflict
1873 target_text = self._on_conflict_target(on_conflict, **kw)
1875 action_set_ops = []
1877 set_parameters = dict(clause.update_values_to_set)
1878 # create a list of column assignment clauses as tuples
1880 insert_statement = self.stack[-1]["selectable"]
1881 cols = insert_statement.table.c
1882 for c in cols:
1883 col_key = c.key
1884 if col_key in set_parameters:
1885 value = set_parameters.pop(col_key)
1886 if elements._is_literal(value):
1887 value = elements.BindParameter(None, value, type_=c.type)
1889 else:
1890 if (
1891 isinstance(value, elements.BindParameter)
1892 and value.type._isnull
1893 ):
1894 value = value._clone()
1895 value.type = c.type
1896 value_text = self.process(value.self_group(), use_schema=False)
1898 key_text = self.preparer.quote(col_key)
1899 action_set_ops.append("%s = %s" % (key_text, value_text))
1901 # check for names that don't match columns
1902 if set_parameters:
1903 util.warn(
1904 "Additional column names not matching "
1905 "any column keys in table '%s': %s"
1906 % (
1907 self.statement.table.name,
1908 (", ".join("'%s'" % c for c in set_parameters)),
1909 )
1910 )
1911 for k, v in set_parameters.items():
1912 key_text = (
1913 self.preparer.quote(k)
1914 if isinstance(k, util.string_types)
1915 else self.process(k, use_schema=False)
1916 )
1917 value_text = self.process(
1918 elements._literal_as_binds(v), use_schema=False
1919 )
1920 action_set_ops.append("%s = %s" % (key_text, value_text))
1922 action_text = ", ".join(action_set_ops)
1923 if clause.update_whereclause is not None:
1924 action_text += " WHERE %s" % self.process(
1925 clause.update_whereclause, include_table=True, use_schema=False
1926 )
1928 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
1930 def update_from_clause(
1931 self, update_stmt, from_table, extra_froms, from_hints, **kw
1932 ):
1933 return "FROM " + ", ".join(
1934 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
1935 for t in extra_froms
1936 )
1938 def delete_extra_from_clause(
1939 self, delete_stmt, from_table, extra_froms, from_hints, **kw
1940 ):
1941 """Render the DELETE .. USING clause specific to PostgreSQL."""
1942 return "USING " + ", ".join(
1943 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
1944 for t in extra_froms
1945 )
1948class PGDDLCompiler(compiler.DDLCompiler):
1949 def get_column_specification(self, column, **kwargs):
1951 colspec = self.preparer.format_column(column)
1952 impl_type = column.type.dialect_impl(self.dialect)
1953 if isinstance(impl_type, sqltypes.TypeDecorator):
1954 impl_type = impl_type.impl
1956 if (
1957 column.primary_key
1958 and column is column.table._autoincrement_column
1959 and (
1960 self.dialect.supports_smallserial
1961 or not isinstance(impl_type, sqltypes.SmallInteger)
1962 )
1963 and (
1964 column.default is None
1965 or (
1966 isinstance(column.default, schema.Sequence)
1967 and column.default.optional
1968 )
1969 )
1970 ):
1971 if isinstance(impl_type, sqltypes.BigInteger):
1972 colspec += " BIGSERIAL"
1973 elif isinstance(impl_type, sqltypes.SmallInteger):
1974 colspec += " SMALLSERIAL"
1975 else:
1976 colspec += " SERIAL"
1977 else:
1978 colspec += " " + self.dialect.type_compiler.process(
1979 column.type,
1980 type_expression=column,
1981 identifier_preparer=self.preparer,
1982 )
1983 default = self.get_column_default_string(column)
1984 if default is not None:
1985 colspec += " DEFAULT " + default
1987 if column.computed is not None:
1988 colspec += " " + self.process(column.computed)
1990 if not column.nullable:
1991 colspec += " NOT NULL"
1992 return colspec
1994 def visit_check_constraint(self, constraint):
1995 if constraint._type_bound:
1996 typ = list(constraint.columns)[0].type
1997 if (
1998 isinstance(typ, sqltypes.ARRAY)
1999 and isinstance(typ.item_type, sqltypes.Enum)
2000 and not typ.item_type.native_enum
2001 ):
2002 raise exc.CompileError(
2003 "PostgreSQL dialect cannot produce the CHECK constraint "
2004 "for ARRAY of non-native ENUM; please specify "
2005 "create_constraint=False on this Enum datatype."
2006 )
2008 return super(PGDDLCompiler, self).visit_check_constraint(constraint)
2010 def visit_drop_table_comment(self, drop):
2011 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
2012 drop.element
2013 )
2015 def visit_create_enum_type(self, create):
2016 type_ = create.element
2018 return "CREATE TYPE %s AS ENUM (%s)" % (
2019 self.preparer.format_type(type_),
2020 ", ".join(
2021 self.sql_compiler.process(sql.literal(e), literal_binds=True)
2022 for e in type_.enums
2023 ),
2024 )
2026 def visit_drop_enum_type(self, drop):
2027 type_ = drop.element
2029 return "DROP TYPE %s" % (self.preparer.format_type(type_))
2031 def visit_create_index(self, create):
2032 preparer = self.preparer
2033 index = create.element
2034 self._verify_index_table(index)
2035 text = "CREATE "
2036 if index.unique:
2037 text += "UNIQUE "
2038 text += "INDEX "
2040 if self.dialect._supports_create_index_concurrently:
2041 concurrently = index.dialect_options["postgresql"]["concurrently"]
2042 if concurrently:
2043 text += "CONCURRENTLY "
2045 text += "%s ON %s " % (
2046 self._prepared_index_name(index, include_schema=False),
2047 preparer.format_table(index.table),
2048 )
2050 using = index.dialect_options["postgresql"]["using"]
2051 if using:
2052 text += (
2053 "USING %s "
2054 % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
2055 )
2057 ops = index.dialect_options["postgresql"]["ops"]
2058 text += "(%s)" % (
2059 ", ".join(
2060 [
2061 self.sql_compiler.process(
2062 expr.self_group()
2063 if not isinstance(expr, expression.ColumnClause)
2064 else expr,
2065 include_table=False,
2066 literal_binds=True,
2067 )
2068 + (
2069 (" " + ops[expr.key])
2070 if hasattr(expr, "key") and expr.key in ops
2071 else ""
2072 )
2073 for expr in index.expressions
2074 ]
2075 )
2076 )
2078 withclause = index.dialect_options["postgresql"]["with"]
2080 if withclause:
2081 text += " WITH (%s)" % (
2082 ", ".join(
2083 [
2084 "%s = %s" % storage_parameter
2085 for storage_parameter in withclause.items()
2086 ]
2087 )
2088 )
2090 tablespace_name = index.dialect_options["postgresql"]["tablespace"]
2092 if tablespace_name:
2093 text += " TABLESPACE %s" % preparer.quote(tablespace_name)
2095 whereclause = index.dialect_options["postgresql"]["where"]
2097 if whereclause is not None:
2098 where_compiled = self.sql_compiler.process(
2099 whereclause, include_table=False, literal_binds=True
2100 )
2101 text += " WHERE " + where_compiled
2102 return text
2104 def visit_drop_index(self, drop):
2105 index = drop.element
2107 text = "\nDROP INDEX "
2109 if self.dialect._supports_drop_index_concurrently:
2110 concurrently = index.dialect_options["postgresql"]["concurrently"]
2111 if concurrently:
2112 text += "CONCURRENTLY "
2114 text += self._prepared_index_name(index, include_schema=True)
2115 return text
2117 def visit_exclude_constraint(self, constraint, **kw):
2118 text = ""
2119 if constraint.name is not None:
2120 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2121 constraint
2122 )
2123 elements = []
2124 for expr, name, op in constraint._render_exprs:
2125 kw["include_table"] = False
2126 elements.append(
2127 "%s WITH %s" % (self.sql_compiler.process(expr, **kw), op)
2128 )
2129 text += "EXCLUDE USING %s (%s)" % (
2130 self.preparer.validate_sql_phrase(
2131 constraint.using, IDX_USING
2132 ).lower(),
2133 ", ".join(elements),
2134 )
2135 if constraint.where is not None:
2136 text += " WHERE (%s)" % self.sql_compiler.process(
2137 constraint.where, literal_binds=True
2138 )
2139 text += self.define_constraint_deferrability(constraint)
2140 return text
2142 def post_create_table(self, table):
2143 table_opts = []
2144 pg_opts = table.dialect_options["postgresql"]
2146 inherits = pg_opts.get("inherits")
2147 if inherits is not None:
2148 if not isinstance(inherits, (list, tuple)):
2149 inherits = (inherits,)
2150 table_opts.append(
2151 "\n INHERITS ( "
2152 + ", ".join(self.preparer.quote(name) for name in inherits)
2153 + " )"
2154 )
2156 if pg_opts["partition_by"]:
2157 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
2159 if pg_opts["with_oids"] is True:
2160 table_opts.append("\n WITH OIDS")
2161 elif pg_opts["with_oids"] is False:
2162 table_opts.append("\n WITHOUT OIDS")
2164 if pg_opts["on_commit"]:
2165 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
2166 table_opts.append("\n ON COMMIT %s" % on_commit_options)
2168 if pg_opts["tablespace"]:
2169 tablespace_name = pg_opts["tablespace"]
2170 table_opts.append(
2171 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
2172 )
2174 return "".join(table_opts)
2176 def visit_computed_column(self, generated):
2177 if generated.persisted is False:
2178 raise exc.CompileError(
2179 "PostrgreSQL computed columns do not support 'virtual' "
2180 "persistence; set the 'persisted' flag to None or True for "
2181 "PostgreSQL support."
2182 )
2184 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
2185 generated.sqltext, include_table=False, literal_binds=True
2186 )
2189class PGTypeCompiler(compiler.GenericTypeCompiler):
2190 def visit_TSVECTOR(self, type_, **kw):
2191 return "TSVECTOR"
2193 def visit_INET(self, type_, **kw):
2194 return "INET"
2196 def visit_CIDR(self, type_, **kw):
2197 return "CIDR"
2199 def visit_MACADDR(self, type_, **kw):
2200 return "MACADDR"
2202 def visit_MONEY(self, type_, **kw):
2203 return "MONEY"
2205 def visit_OID(self, type_, **kw):
2206 return "OID"
2208 def visit_REGCLASS(self, type_, **kw):
2209 return "REGCLASS"
2211 def visit_FLOAT(self, type_, **kw):
2212 if not type_.precision:
2213 return "FLOAT"
2214 else:
2215 return "FLOAT(%(precision)s)" % {"precision": type_.precision}
2217 def visit_DOUBLE_PRECISION(self, type_, **kw):
2218 return "DOUBLE PRECISION"
2220 def visit_BIGINT(self, type_, **kw):
2221 return "BIGINT"
2223 def visit_HSTORE(self, type_, **kw):
2224 return "HSTORE"
2226 def visit_JSON(self, type_, **kw):
2227 return "JSON"
2229 def visit_JSONB(self, type_, **kw):
2230 return "JSONB"
2232 def visit_INT4RANGE(self, type_, **kw):
2233 return "INT4RANGE"
2235 def visit_INT8RANGE(self, type_, **kw):
2236 return "INT8RANGE"
2238 def visit_NUMRANGE(self, type_, **kw):
2239 return "NUMRANGE"
2241 def visit_DATERANGE(self, type_, **kw):
2242 return "DATERANGE"
2244 def visit_TSRANGE(self, type_, **kw):
2245 return "TSRANGE"
2247 def visit_TSTZRANGE(self, type_, **kw):
2248 return "TSTZRANGE"
2250 def visit_datetime(self, type_, **kw):
2251 return self.visit_TIMESTAMP(type_, **kw)
2253 def visit_enum(self, type_, **kw):
2254 if not type_.native_enum or not self.dialect.supports_native_enum:
2255 return super(PGTypeCompiler, self).visit_enum(type_, **kw)
2256 else:
2257 return self.visit_ENUM(type_, **kw)
2259 def visit_ENUM(self, type_, identifier_preparer=None, **kw):
2260 if identifier_preparer is None:
2261 identifier_preparer = self.dialect.identifier_preparer
2263 return identifier_preparer.format_type(type_)
2265 def visit_TIMESTAMP(self, type_, **kw):
2266 return "TIMESTAMP%s %s" % (
2267 "(%d)" % type_.precision
2268 if getattr(type_, "precision", None) is not None
2269 else "",
2270 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2271 )
2273 def visit_TIME(self, type_, **kw):
2274 return "TIME%s %s" % (
2275 "(%d)" % type_.precision
2276 if getattr(type_, "precision", None) is not None
2277 else "",
2278 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2279 )
2281 def visit_INTERVAL(self, type_, **kw):
2282 text = "INTERVAL"
2283 if type_.fields is not None:
2284 text += " " + type_.fields
2285 if type_.precision is not None:
2286 text += " (%d)" % type_.precision
2287 return text
2289 def visit_BIT(self, type_, **kw):
2290 if type_.varying:
2291 compiled = "BIT VARYING"
2292 if type_.length is not None:
2293 compiled += "(%d)" % type_.length
2294 else:
2295 compiled = "BIT(%d)" % type_.length
2296 return compiled
2298 def visit_UUID(self, type_, **kw):
2299 return "UUID"
2301 def visit_large_binary(self, type_, **kw):
2302 return self.visit_BYTEA(type_, **kw)
2304 def visit_BYTEA(self, type_, **kw):
2305 return "BYTEA"
2307 def visit_ARRAY(self, type_, **kw):
2309 # TODO: pass **kw?
2310 inner = self.process(type_.item_type)
2311 return re.sub(
2312 r"((?: COLLATE.*)?)$",
2313 (
2314 r"%s\1"
2315 % (
2316 "[]"
2317 * (type_.dimensions if type_.dimensions is not None else 1)
2318 )
2319 ),
2320 inner,
2321 count=1,
2322 )
2325class PGIdentifierPreparer(compiler.IdentifierPreparer):
2327 reserved_words = RESERVED_WORDS
2329 def _unquote_identifier(self, value):
2330 if value[0] == self.initial_quote:
2331 value = value[1:-1].replace(
2332 self.escape_to_quote, self.escape_quote
2333 )
2334 return value
2336 def format_type(self, type_, use_schema=True):
2337 if not type_.name:
2338 raise exc.CompileError("PostgreSQL ENUM type requires a name.")
2340 name = self.quote(type_.name)
2341 effective_schema = self.schema_for_object(type_)
2343 if (
2344 not self.omit_schema
2345 and use_schema
2346 and effective_schema is not None
2347 ):
2348 name = self.quote_schema(effective_schema) + "." + name
2349 return name
2352class PGInspector(reflection.Inspector):
2353 def __init__(self, conn):
2354 reflection.Inspector.__init__(self, conn)
2356 def get_table_oid(self, table_name, schema=None):
2357 """Return the OID for the given table name."""
2359 return self.dialect.get_table_oid(
2360 self.bind, table_name, schema, info_cache=self.info_cache
2361 )
2363 def get_enums(self, schema=None):
2364 """Return a list of ENUM objects.
2366 Each member is a dictionary containing these fields:
2368 * name - name of the enum
2369 * schema - the schema name for the enum.
2370 * visible - boolean, whether or not this enum is visible
2371 in the default search path.
2372 * labels - a list of string labels that apply to the enum.
2374 :param schema: schema name. If None, the default schema
2375 (typically 'public') is used. May also be set to '*' to
2376 indicate load enums for all schemas.
2378 .. versionadded:: 1.0.0
2380 """
2381 schema = schema or self.default_schema_name
2382 return self.dialect._load_enums(self.bind, schema)
2384 def get_foreign_table_names(self, schema=None):
2385 """Return a list of FOREIGN TABLE names.
2387 Behavior is similar to that of
2388 :meth:`_reflection.Inspector.get_table_names`,
2389 except that the list is limited to those tables that report a
2390 ``relkind`` value of ``f``.
2392 .. versionadded:: 1.0.0
2394 """
2395 schema = schema or self.default_schema_name
2396 return self.dialect._get_foreign_table_names(self.bind, schema)
2398 def get_view_names(self, schema=None, include=("plain", "materialized")):
2399 """Return all view names in `schema`.
2401 :param schema: Optional, retrieve names from a non-default schema.
2402 For special quoting, use :class:`.quoted_name`.
2404 :param include: specify which types of views to return. Passed
2405 as a string value (for a single type) or a tuple (for any number
2406 of types). Defaults to ``('plain', 'materialized')``.
2408 .. versionadded:: 1.1
2410 """
2412 return self.dialect.get_view_names(
2413 self.bind, schema, info_cache=self.info_cache, include=include
2414 )
2417class CreateEnumType(schema._CreateDropBase):
2418 __visit_name__ = "create_enum_type"
2421class DropEnumType(schema._CreateDropBase):
2422 __visit_name__ = "drop_enum_type"
2425class PGExecutionContext(default.DefaultExecutionContext):
2426 def fire_sequence(self, seq, type_):
2427 return self._execute_scalar(
2428 (
2429 "select nextval('%s')"
2430 % self.dialect.identifier_preparer.format_sequence(seq)
2431 ),
2432 type_,
2433 )
2435 def get_insert_default(self, column):
2436 if column.primary_key and column is column.table._autoincrement_column:
2437 if column.server_default and column.server_default.has_argument:
2439 # pre-execute passive defaults on primary key columns
2440 return self._execute_scalar(
2441 "select %s" % column.server_default.arg, column.type
2442 )
2444 elif column.default is None or (
2445 column.default.is_sequence and column.default.optional
2446 ):
2448 # execute the sequence associated with a SERIAL primary
2449 # key column. for non-primary-key SERIAL, the ID just
2450 # generates server side.
2452 try:
2453 seq_name = column._postgresql_seq_name
2454 except AttributeError:
2455 tab = column.table.name
2456 col = column.name
2457 tab = tab[0 : 29 + max(0, (29 - len(col)))]
2458 col = col[0 : 29 + max(0, (29 - len(tab)))]
2459 name = "%s_%s_seq" % (tab, col)
2460 column._postgresql_seq_name = seq_name = name
2462 if column.table is not None:
2463 effective_schema = self.connection.schema_for_object(
2464 column.table
2465 )
2466 else:
2467 effective_schema = None
2469 if effective_schema is not None:
2470 exc = 'select nextval(\'"%s"."%s"\')' % (
2471 effective_schema,
2472 seq_name,
2473 )
2474 else:
2475 exc = "select nextval('\"%s\"')" % (seq_name,)
2477 return self._execute_scalar(exc, column.type)
2479 return super(PGExecutionContext, self).get_insert_default(column)
2481 def should_autocommit_text(self, statement):
2482 return AUTOCOMMIT_REGEXP.match(statement)
2485class PGDialect(default.DefaultDialect):
2486 name = "postgresql"
2487 supports_alter = True
2488 max_identifier_length = 63
2489 supports_sane_rowcount = True
2491 supports_native_enum = True
2492 supports_native_boolean = True
2493 supports_smallserial = True
2495 supports_sequences = True
2496 sequences_optional = True
2497 preexecute_autoincrement_sequences = True
2498 postfetch_lastrowid = False
2500 supports_comments = True
2501 supports_default_values = True
2502 supports_empty_insert = False
2503 supports_multivalues_insert = True
2504 default_paramstyle = "pyformat"
2505 ischema_names = ischema_names
2506 colspecs = colspecs
2508 statement_compiler = PGCompiler
2509 ddl_compiler = PGDDLCompiler
2510 type_compiler = PGTypeCompiler
2511 preparer = PGIdentifierPreparer
2512 execution_ctx_cls = PGExecutionContext
2513 inspector = PGInspector
2514 isolation_level = None
2516 construct_arguments = [
2517 (
2518 schema.Index,
2519 {
2520 "using": False,
2521 "where": None,
2522 "ops": {},
2523 "concurrently": False,
2524 "with": {},
2525 "tablespace": None,
2526 },
2527 ),
2528 (
2529 schema.Table,
2530 {
2531 "ignore_search_path": False,
2532 "tablespace": None,
2533 "partition_by": None,
2534 "with_oids": None,
2535 "on_commit": None,
2536 "inherits": None,
2537 },
2538 ),
2539 ]
2541 reflection_options = ("postgresql_ignore_search_path",)
2543 _backslash_escapes = True
2544 _supports_create_index_concurrently = True
2545 _supports_drop_index_concurrently = True
2547 def __init__(
2548 self,
2549 isolation_level=None,
2550 json_serializer=None,
2551 json_deserializer=None,
2552 **kwargs
2553 ):
2554 default.DefaultDialect.__init__(self, **kwargs)
2555 self.isolation_level = isolation_level
2556 self._json_deserializer = json_deserializer
2557 self._json_serializer = json_serializer
2559 def initialize(self, connection):
2560 super(PGDialect, self).initialize(connection)
2561 self.implicit_returning = self.server_version_info > (
2562 8,
2563 2,
2564 ) and self.__dict__.get("implicit_returning", True)
2565 self.supports_native_enum = self.server_version_info >= (8, 3)
2566 if not self.supports_native_enum:
2567 self.colspecs = self.colspecs.copy()
2568 # pop base Enum type
2569 self.colspecs.pop(sqltypes.Enum, None)
2570 # psycopg2, others may have placed ENUM here as well
2571 self.colspecs.pop(ENUM, None)
2573 # http://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
2574 self.supports_smallserial = self.server_version_info >= (9, 2)
2576 self._backslash_escapes = (
2577 self.server_version_info < (8, 2)
2578 or connection.scalar("show standard_conforming_strings") == "off"
2579 )
2581 self._supports_create_index_concurrently = (
2582 self.server_version_info >= (8, 2)
2583 )
2584 self._supports_drop_index_concurrently = self.server_version_info >= (
2585 9,
2586 2,
2587 )
2589 def on_connect(self):
2590 if self.isolation_level is not None:
2592 def connect(conn):
2593 self.set_isolation_level(conn, self.isolation_level)
2595 return connect
2596 else:
2597 return None
2599 _isolation_lookup = set(
2600 [
2601 "SERIALIZABLE",
2602 "READ UNCOMMITTED",
2603 "READ COMMITTED",
2604 "REPEATABLE READ",
2605 ]
2606 )
2608 def set_isolation_level(self, connection, level):
2609 level = level.replace("_", " ")
2610 if level not in self._isolation_lookup:
2611 raise exc.ArgumentError(
2612 "Invalid value '%s' for isolation_level. "
2613 "Valid isolation levels for %s are %s"
2614 % (level, self.name, ", ".join(self._isolation_lookup))
2615 )
2616 cursor = connection.cursor()
2617 cursor.execute(
2618 "SET SESSION CHARACTERISTICS AS TRANSACTION "
2619 "ISOLATION LEVEL %s" % level
2620 )
2621 cursor.execute("COMMIT")
2622 cursor.close()
2624 def get_isolation_level(self, connection):
2625 cursor = connection.cursor()
2626 cursor.execute("show transaction isolation level")
2627 val = cursor.fetchone()[0]
2628 cursor.close()
2629 return val.upper()
2631 def do_begin_twophase(self, connection, xid):
2632 self.do_begin(connection.connection)
2634 def do_prepare_twophase(self, connection, xid):
2635 connection.execute("PREPARE TRANSACTION '%s'" % xid)
2637 def do_rollback_twophase(
2638 self, connection, xid, is_prepared=True, recover=False
2639 ):
2640 if is_prepared:
2641 if recover:
2642 # FIXME: ugly hack to get out of transaction
2643 # context when committing recoverable transactions
2644 # Must find out a way how to make the dbapi not
2645 # open a transaction.
2646 connection.execute("ROLLBACK")
2647 connection.execute("ROLLBACK PREPARED '%s'" % xid)
2648 connection.execute("BEGIN")
2649 self.do_rollback(connection.connection)
2650 else:
2651 self.do_rollback(connection.connection)
2653 def do_commit_twophase(
2654 self, connection, xid, is_prepared=True, recover=False
2655 ):
2656 if is_prepared:
2657 if recover:
2658 connection.execute("ROLLBACK")
2659 connection.execute("COMMIT PREPARED '%s'" % xid)
2660 connection.execute("BEGIN")
2661 self.do_rollback(connection.connection)
2662 else:
2663 self.do_commit(connection.connection)
2665 def do_recover_twophase(self, connection):
2666 resultset = connection.execute(
2667 sql.text("SELECT gid FROM pg_prepared_xacts")
2668 )
2669 return [row[0] for row in resultset]
2671 def _get_default_schema_name(self, connection):
2672 return connection.scalar("select current_schema()")
2674 def has_schema(self, connection, schema):
2675 query = (
2676 "select nspname from pg_namespace " "where lower(nspname)=:schema"
2677 )
2678 cursor = connection.execute(
2679 sql.text(query).bindparams(
2680 sql.bindparam(
2681 "schema",
2682 util.text_type(schema.lower()),
2683 type_=sqltypes.Unicode,
2684 )
2685 )
2686 )
2688 return bool(cursor.first())
2690 def has_table(self, connection, table_name, schema=None):
2691 # seems like case gets folded in pg_class...
2692 if schema is None:
2693 cursor = connection.execute(
2694 sql.text(
2695 "select relname from pg_class c join pg_namespace n on "
2696 "n.oid=c.relnamespace where "
2697 "pg_catalog.pg_table_is_visible(c.oid) "
2698 "and relname=:name"
2699 ).bindparams(
2700 sql.bindparam(
2701 "name",
2702 util.text_type(table_name),
2703 type_=sqltypes.Unicode,
2704 )
2705 )
2706 )
2707 else:
2708 cursor = connection.execute(
2709 sql.text(
2710 "select relname from pg_class c join pg_namespace n on "
2711 "n.oid=c.relnamespace where n.nspname=:schema and "
2712 "relname=:name"
2713 ).bindparams(
2714 sql.bindparam(
2715 "name",
2716 util.text_type(table_name),
2717 type_=sqltypes.Unicode,
2718 ),
2719 sql.bindparam(
2720 "schema",
2721 util.text_type(schema),
2722 type_=sqltypes.Unicode,
2723 ),
2724 )
2725 )
2726 return bool(cursor.first())
2728 def has_sequence(self, connection, sequence_name, schema=None):
2729 if schema is None:
2730 cursor = connection.execute(
2731 sql.text(
2732 "SELECT relname FROM pg_class c join pg_namespace n on "
2733 "n.oid=c.relnamespace where relkind='S' and "
2734 "n.nspname=current_schema() "
2735 "and relname=:name"
2736 ).bindparams(
2737 sql.bindparam(
2738 "name",
2739 util.text_type(sequence_name),
2740 type_=sqltypes.Unicode,
2741 )
2742 )
2743 )
2744 else:
2745 cursor = connection.execute(
2746 sql.text(
2747 "SELECT relname FROM pg_class c join pg_namespace n on "
2748 "n.oid=c.relnamespace where relkind='S' and "
2749 "n.nspname=:schema and relname=:name"
2750 ).bindparams(
2751 sql.bindparam(
2752 "name",
2753 util.text_type(sequence_name),
2754 type_=sqltypes.Unicode,
2755 ),
2756 sql.bindparam(
2757 "schema",
2758 util.text_type(schema),
2759 type_=sqltypes.Unicode,
2760 ),
2761 )
2762 )
2764 return bool(cursor.first())
2766 def has_type(self, connection, type_name, schema=None):
2767 if schema is not None:
2768 query = """
2769 SELECT EXISTS (
2770 SELECT * FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n
2771 WHERE t.typnamespace = n.oid
2772 AND t.typname = :typname
2773 AND n.nspname = :nspname
2774 )
2775 """
2776 query = sql.text(query)
2777 else:
2778 query = """
2779 SELECT EXISTS (
2780 SELECT * FROM pg_catalog.pg_type t
2781 WHERE t.typname = :typname
2782 AND pg_type_is_visible(t.oid)
2783 )
2784 """
2785 query = sql.text(query)
2786 query = query.bindparams(
2787 sql.bindparam(
2788 "typname", util.text_type(type_name), type_=sqltypes.Unicode
2789 )
2790 )
2791 if schema is not None:
2792 query = query.bindparams(
2793 sql.bindparam(
2794 "nspname", util.text_type(schema), type_=sqltypes.Unicode
2795 )
2796 )
2797 cursor = connection.execute(query)
2798 return bool(cursor.scalar())
2800 def _get_server_version_info(self, connection):
2801 v = connection.execute("select version()").scalar()
2802 m = re.match(
2803 r".*(?:PostgreSQL|EnterpriseDB) "
2804 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
2805 v,
2806 )
2807 if not m:
2808 raise AssertionError(
2809 "Could not determine version from string '%s'" % v
2810 )
2811 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
2813 @reflection.cache
2814 def get_table_oid(self, connection, table_name, schema=None, **kw):
2815 """Fetch the oid for schema.table_name.
2817 Several reflection methods require the table oid. The idea for using
2818 this method is that it can be fetched one time and cached for
2819 subsequent calls.
2821 """
2822 table_oid = None
2823 if schema is not None:
2824 schema_where_clause = "n.nspname = :schema"
2825 else:
2826 schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"
2827 query = (
2828 """
2829 SELECT c.oid
2830 FROM pg_catalog.pg_class c
2831 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
2832 WHERE (%s)
2833 AND c.relname = :table_name AND c.relkind in
2834 ('r', 'v', 'm', 'f', 'p')
2835 """
2836 % schema_where_clause
2837 )
2838 # Since we're binding to unicode, table_name and schema_name must be
2839 # unicode.
2840 table_name = util.text_type(table_name)
2841 if schema is not None:
2842 schema = util.text_type(schema)
2843 s = sql.text(query).bindparams(table_name=sqltypes.Unicode)
2844 s = s.columns(oid=sqltypes.Integer)
2845 if schema:
2846 s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode))
2847 c = connection.execute(s, table_name=table_name, schema=schema)
2848 table_oid = c.scalar()
2849 if table_oid is None:
2850 raise exc.NoSuchTableError(table_name)
2851 return table_oid
2853 @reflection.cache
2854 def get_schema_names(self, connection, **kw):
2855 result = connection.execute(
2856 sql.text(
2857 "SELECT nspname FROM pg_namespace "
2858 "WHERE nspname NOT LIKE 'pg_%' "
2859 "ORDER BY nspname"
2860 ).columns(nspname=sqltypes.Unicode)
2861 )
2862 return [name for name, in result]
2864 @reflection.cache
2865 def get_table_names(self, connection, schema=None, **kw):
2866 result = connection.execute(
2867 sql.text(
2868 "SELECT c.relname FROM pg_class c "
2869 "JOIN pg_namespace n ON n.oid = c.relnamespace "
2870 "WHERE n.nspname = :schema AND c.relkind in ('r', 'p')"
2871 ).columns(relname=sqltypes.Unicode),
2872 schema=schema if schema is not None else self.default_schema_name,
2873 )
2874 return [name for name, in result]
2876 @reflection.cache
2877 def _get_foreign_table_names(self, connection, schema=None, **kw):
2878 result = connection.execute(
2879 sql.text(
2880 "SELECT c.relname FROM pg_class c "
2881 "JOIN pg_namespace n ON n.oid = c.relnamespace "
2882 "WHERE n.nspname = :schema AND c.relkind = 'f'"
2883 ).columns(relname=sqltypes.Unicode),
2884 schema=schema if schema is not None else self.default_schema_name,
2885 )
2886 return [name for name, in result]
2888 @reflection.cache
2889 def get_view_names(
2890 self, connection, schema=None, include=("plain", "materialized"), **kw
2891 ):
2893 include_kind = {"plain": "v", "materialized": "m"}
2894 try:
2895 kinds = [include_kind[i] for i in util.to_list(include)]
2896 except KeyError:
2897 raise ValueError(
2898 "include %r unknown, needs to be a sequence containing "
2899 "one or both of 'plain' and 'materialized'" % (include,)
2900 )
2901 if not kinds:
2902 raise ValueError(
2903 "empty include, needs to be a sequence containing "
2904 "one or both of 'plain' and 'materialized'"
2905 )
2907 result = connection.execute(
2908 sql.text(
2909 "SELECT c.relname FROM pg_class c "
2910 "JOIN pg_namespace n ON n.oid = c.relnamespace "
2911 "WHERE n.nspname = :schema AND c.relkind IN (%s)"
2912 % (", ".join("'%s'" % elem for elem in kinds))
2913 ).columns(relname=sqltypes.Unicode),
2914 schema=schema if schema is not None else self.default_schema_name,
2915 )
2916 return [name for name, in result]
2918 @reflection.cache
2919 def get_view_definition(self, connection, view_name, schema=None, **kw):
2920 view_def = connection.scalar(
2921 sql.text(
2922 "SELECT pg_get_viewdef(c.oid) view_def FROM pg_class c "
2923 "JOIN pg_namespace n ON n.oid = c.relnamespace "
2924 "WHERE n.nspname = :schema AND c.relname = :view_name "
2925 "AND c.relkind IN ('v', 'm')"
2926 ).columns(view_def=sqltypes.Unicode),
2927 schema=schema if schema is not None else self.default_schema_name,
2928 view_name=view_name,
2929 )
2930 return view_def
2932 @reflection.cache
2933 def get_columns(self, connection, table_name, schema=None, **kw):
2935 table_oid = self.get_table_oid(
2936 connection, table_name, schema, info_cache=kw.get("info_cache")
2937 )
2939 generated = (
2940 "a.attgenerated as generated"
2941 if self.server_version_info >= (12,)
2942 else "NULL as generated"
2943 )
2944 SQL_COLS = (
2945 """
2946 SELECT a.attname,
2947 pg_catalog.format_type(a.atttypid, a.atttypmod),
2948 (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
2949 FROM pg_catalog.pg_attrdef d
2950 WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
2951 AND a.atthasdef)
2952 AS DEFAULT,
2953 a.attnotnull, a.attnum, a.attrelid as table_oid,
2954 pgd.description as comment,
2955 %s
2956 FROM pg_catalog.pg_attribute a
2957 LEFT JOIN pg_catalog.pg_description pgd ON (
2958 pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
2959 WHERE a.attrelid = :table_oid
2960 AND a.attnum > 0 AND NOT a.attisdropped
2961 ORDER BY a.attnum
2962 """
2963 % generated
2964 )
2965 s = (
2966 sql.text(SQL_COLS)
2967 .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
2968 .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode)
2969 )
2970 c = connection.execute(s, table_oid=table_oid)
2971 rows = c.fetchall()
2973 # dictionary with (name, ) if default search path or (schema, name)
2974 # as keys
2975 domains = self._load_domains(connection)
2977 # dictionary with (name, ) if default search path or (schema, name)
2978 # as keys
2979 enums = dict(
2980 ((rec["name"],), rec)
2981 if rec["visible"]
2982 else ((rec["schema"], rec["name"]), rec)
2983 for rec in self._load_enums(connection, schema="*")
2984 )
2986 # format columns
2987 columns = []
2989 for (
2990 name,
2991 format_type,
2992 default_,
2993 notnull,
2994 attnum,
2995 table_oid,
2996 comment,
2997 generated,
2998 ) in rows:
2999 column_info = self._get_column_info(
3000 name,
3001 format_type,
3002 default_,
3003 notnull,
3004 domains,
3005 enums,
3006 schema,
3007 comment,
3008 generated,
3009 )
3010 columns.append(column_info)
3011 return columns
3013 def _get_column_info(
3014 self,
3015 name,
3016 format_type,
3017 default,
3018 notnull,
3019 domains,
3020 enums,
3021 schema,
3022 comment,
3023 generated,
3024 ):
3025 def _handle_array_type(attype):
3026 return (
3027 # strip '[]' from integer[], etc.
3028 re.sub(r"\[\]$", "", attype),
3029 attype.endswith("[]"),
3030 )
3032 # strip (*) from character varying(5), timestamp(5)
3033 # with time zone, geometry(POLYGON), etc.
3034 attype = re.sub(r"\(.*\)", "", format_type)
3036 # strip '[]' from integer[], etc. and check if an array
3037 attype, is_array = _handle_array_type(attype)
3039 # strip quotes from case sensitive enum or domain names
3040 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
3042 nullable = not notnull
3044 charlen = re.search(r"\(([\d,]+)\)", format_type)
3045 if charlen:
3046 charlen = charlen.group(1)
3047 args = re.search(r"\((.*)\)", format_type)
3048 if args and args.group(1):
3049 args = tuple(re.split(r"\s*,\s*", args.group(1)))
3050 else:
3051 args = ()
3052 kwargs = {}
3054 if attype == "numeric":
3055 if charlen:
3056 prec, scale = charlen.split(",")
3057 args = (int(prec), int(scale))
3058 else:
3059 args = ()
3060 elif attype == "double precision":
3061 args = (53,)
3062 elif attype == "integer":
3063 args = ()
3064 elif attype in ("timestamp with time zone", "time with time zone"):
3065 kwargs["timezone"] = True
3066 if charlen:
3067 kwargs["precision"] = int(charlen)
3068 args = ()
3069 elif attype in (
3070 "timestamp without time zone",
3071 "time without time zone",
3072 "time",
3073 ):
3074 kwargs["timezone"] = False
3075 if charlen:
3076 kwargs["precision"] = int(charlen)
3077 args = ()
3078 elif attype == "bit varying":
3079 kwargs["varying"] = True
3080 if charlen:
3081 args = (int(charlen),)
3082 else:
3083 args = ()
3084 elif attype.startswith("interval"):
3085 field_match = re.match(r"interval (.+)", attype, re.I)
3086 if charlen:
3087 kwargs["precision"] = int(charlen)
3088 if field_match:
3089 kwargs["fields"] = field_match.group(1)
3090 attype = "interval"
3091 args = ()
3092 elif charlen:
3093 args = (int(charlen),)
3095 while True:
3096 # looping here to suit nested domains
3097 if attype in self.ischema_names:
3098 coltype = self.ischema_names[attype]
3099 break
3100 elif enum_or_domain_key in enums:
3101 enum = enums[enum_or_domain_key]
3102 coltype = ENUM
3103 kwargs["name"] = enum["name"]
3104 if not enum["visible"]:
3105 kwargs["schema"] = enum["schema"]
3106 args = tuple(enum["labels"])
3107 break
3108 elif enum_or_domain_key in domains:
3109 domain = domains[enum_or_domain_key]
3110 attype = domain["attype"]
3111 attype, is_array = _handle_array_type(attype)
3112 # strip quotes from case sensitive enum or domain names
3113 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
3114 # A table can't override whether the domain is nullable.
3115 nullable = domain["nullable"]
3116 if domain["default"] and not default:
3117 # It can, however, override the default
3118 # value, but can't set it to null.
3119 default = domain["default"]
3120 continue
3121 else:
3122 coltype = None
3123 break
3125 if coltype:
3126 coltype = coltype(*args, **kwargs)
3127 if is_array:
3128 coltype = self.ischema_names["_array"](coltype)
3129 else:
3130 util.warn(
3131 "Did not recognize type '%s' of column '%s'" % (attype, name)
3132 )
3133 coltype = sqltypes.NULLTYPE
3135 # If a zero byte (''), then not a generated column.
3136 # Otherwise, s = stored. (Other values might be added in the future.)
3137 if generated:
3138 computed = dict(sqltext=default, persisted=generated == "s")
3139 default = None
3140 else:
3141 computed = None
3143 # adjust the default value
3144 autoincrement = False
3145 if default is not None:
3146 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
3147 if match is not None:
3148 if issubclass(coltype._type_affinity, sqltypes.Integer):
3149 autoincrement = True
3150 # the default is related to a Sequence
3151 sch = schema
3152 if "." not in match.group(2) and sch is not None:
3153 # unconditionally quote the schema name. this could
3154 # later be enhanced to obey quoting rules /
3155 # "quote schema"
3156 default = (
3157 match.group(1)
3158 + ('"%s"' % sch)
3159 + "."
3160 + match.group(2)
3161 + match.group(3)
3162 )
3164 column_info = dict(
3165 name=name,
3166 type=coltype,
3167 nullable=nullable,
3168 default=default,
3169 autoincrement=autoincrement,
3170 comment=comment,
3171 )
3172 if computed is not None:
3173 column_info["computed"] = computed
3174 return column_info
3176 @reflection.cache
3177 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
3178 table_oid = self.get_table_oid(
3179 connection, table_name, schema, info_cache=kw.get("info_cache")
3180 )
3182 if self.server_version_info < (8, 4):
3183 PK_SQL = """
3184 SELECT a.attname
3185 FROM
3186 pg_class t
3187 join pg_index ix on t.oid = ix.indrelid
3188 join pg_attribute a
3189 on t.oid=a.attrelid AND %s
3190 WHERE
3191 t.oid = :table_oid and ix.indisprimary = 't'
3192 ORDER BY a.attnum
3193 """ % self._pg_index_any(
3194 "a.attnum", "ix.indkey"
3195 )
3197 else:
3198 # unnest() and generate_subscripts() both introduced in
3199 # version 8.4
3200 PK_SQL = """
3201 SELECT a.attname
3202 FROM pg_attribute a JOIN (
3203 SELECT unnest(ix.indkey) attnum,
3204 generate_subscripts(ix.indkey, 1) ord
3205 FROM pg_index ix
3206 WHERE ix.indrelid = :table_oid AND ix.indisprimary
3207 ) k ON a.attnum=k.attnum
3208 WHERE a.attrelid = :table_oid
3209 ORDER BY k.ord
3210 """
3211 t = sql.text(PK_SQL).columns(attname=sqltypes.Unicode)
3212 c = connection.execute(t, table_oid=table_oid)
3213 cols = [r[0] for r in c.fetchall()]
3215 PK_CONS_SQL = """
3216 SELECT conname
3217 FROM pg_catalog.pg_constraint r
3218 WHERE r.conrelid = :table_oid AND r.contype = 'p'
3219 ORDER BY 1
3220 """
3221 t = sql.text(PK_CONS_SQL).columns(conname=sqltypes.Unicode)
3222 c = connection.execute(t, table_oid=table_oid)
3223 name = c.scalar()
3225 return {"constrained_columns": cols, "name": name}
3227 @reflection.cache
3228 def get_foreign_keys(
3229 self,
3230 connection,
3231 table_name,
3232 schema=None,
3233 postgresql_ignore_search_path=False,
3234 **kw
3235 ):
3236 preparer = self.identifier_preparer
3237 table_oid = self.get_table_oid(
3238 connection, table_name, schema, info_cache=kw.get("info_cache")
3239 )
3241 FK_SQL = """
3242 SELECT r.conname,
3243 pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
3244 n.nspname as conschema
3245 FROM pg_catalog.pg_constraint r,
3246 pg_namespace n,
3247 pg_class c
3249 WHERE r.conrelid = :table AND
3250 r.contype = 'f' AND
3251 c.oid = confrelid AND
3252 n.oid = c.relnamespace
3253 ORDER BY 1
3254 """
3255 # http://www.postgresql.org/docs/9.0/static/sql-createtable.html
3256 FK_REGEX = re.compile(
3257 r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)"
3258 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
3259 r"[\s]?(ON UPDATE "
3260 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
3261 r"[\s]?(ON DELETE "
3262 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
3263 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
3264 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
3265 )
3267 t = sql.text(FK_SQL).columns(
3268 conname=sqltypes.Unicode, condef=sqltypes.Unicode
3269 )
3270 c = connection.execute(t, table=table_oid)
3271 fkeys = []
3272 for conname, condef, conschema in c.fetchall():
3273 m = re.search(FK_REGEX, condef).groups()
3275 (
3276 constrained_columns,
3277 referred_schema,
3278 referred_table,
3279 referred_columns,
3280 _,
3281 match,
3282 _,
3283 onupdate,
3284 _,
3285 ondelete,
3286 deferrable,
3287 _,
3288 initially,
3289 ) = m
3291 if deferrable is not None:
3292 deferrable = True if deferrable == "DEFERRABLE" else False
3293 constrained_columns = [
3294 preparer._unquote_identifier(x)
3295 for x in re.split(r"\s*,\s*", constrained_columns)
3296 ]
3298 if postgresql_ignore_search_path:
3299 # when ignoring search path, we use the actual schema
3300 # provided it isn't the "default" schema
3301 if conschema != self.default_schema_name:
3302 referred_schema = conschema
3303 else:
3304 referred_schema = schema
3305 elif referred_schema:
3306 # referred_schema is the schema that we regexp'ed from
3307 # pg_get_constraintdef(). If the schema is in the search
3308 # path, pg_get_constraintdef() will give us None.
3309 referred_schema = preparer._unquote_identifier(referred_schema)
3310 elif schema is not None and schema == conschema:
3311 # If the actual schema matches the schema of the table
3312 # we're reflecting, then we will use that.
3313 referred_schema = schema
3315 referred_table = preparer._unquote_identifier(referred_table)
3316 referred_columns = [
3317 preparer._unquote_identifier(x)
3318 for x in re.split(r"\s*,\s", referred_columns)
3319 ]
3320 fkey_d = {
3321 "name": conname,
3322 "constrained_columns": constrained_columns,
3323 "referred_schema": referred_schema,
3324 "referred_table": referred_table,
3325 "referred_columns": referred_columns,
3326 "options": {
3327 "onupdate": onupdate,
3328 "ondelete": ondelete,
3329 "deferrable": deferrable,
3330 "initially": initially,
3331 "match": match,
3332 },
3333 }
3334 fkeys.append(fkey_d)
3335 return fkeys
3337 def _pg_index_any(self, col, compare_to):
3338 if self.server_version_info < (8, 1):
3339 # http://www.postgresql.org/message-id/10279.1124395722@sss.pgh.pa.us
3340 # "In CVS tip you could replace this with "attnum = ANY (indkey)".
3341 # Unfortunately, most array support doesn't work on int2vector in
3342 # pre-8.1 releases, so I think you're kinda stuck with the above
3343 # for now.
3344 # regards, tom lane"
3345 return "(%s)" % " OR ".join(
3346 "%s[%d] = %s" % (compare_to, ind, col) for ind in range(0, 10)
3347 )
3348 else:
3349 return "%s = ANY(%s)" % (col, compare_to)
3351 @reflection.cache
3352 def get_indexes(self, connection, table_name, schema, **kw):
3353 table_oid = self.get_table_oid(
3354 connection, table_name, schema, info_cache=kw.get("info_cache")
3355 )
3357 # cast indkey as varchar since it's an int2vector,
3358 # returned as a list by some drivers such as pypostgresql
3360 if self.server_version_info < (8, 5):
3361 IDX_SQL = """
3362 SELECT
3363 i.relname as relname,
3364 ix.indisunique, ix.indexprs, ix.indpred,
3365 a.attname, a.attnum, NULL, ix.indkey%s,
3366 %s, %s, am.amname,
3367 NULL as indnkeyatts
3368 FROM
3369 pg_class t
3370 join pg_index ix on t.oid = ix.indrelid
3371 join pg_class i on i.oid = ix.indexrelid
3372 left outer join
3373 pg_attribute a
3374 on t.oid = a.attrelid and %s
3375 left outer join
3376 pg_am am
3377 on i.relam = am.oid
3378 WHERE
3379 t.relkind IN ('r', 'v', 'f', 'm')
3380 and t.oid = :table_oid
3381 and ix.indisprimary = 'f'
3382 ORDER BY
3383 t.relname,
3384 i.relname
3385 """ % (
3386 # version 8.3 here was based on observing the
3387 # cast does not work in PG 8.2.4, does work in 8.3.0.
3388 # nothing in PG changelogs regarding this.
3389 "::varchar" if self.server_version_info >= (8, 3) else "",
3390 "ix.indoption::varchar"
3391 if self.server_version_info >= (8, 3)
3392 else "NULL",
3393 "i.reloptions"
3394 if self.server_version_info >= (8, 2)
3395 else "NULL",
3396 self._pg_index_any("a.attnum", "ix.indkey"),
3397 )
3398 else:
3399 IDX_SQL = """
3400 SELECT
3401 i.relname as relname,
3402 ix.indisunique, ix.indexprs, ix.indpred,
3403 a.attname, a.attnum, c.conrelid, ix.indkey::varchar,
3404 ix.indoption::varchar, i.reloptions, am.amname,
3405 %s as indnkeyatts
3406 FROM
3407 pg_class t
3408 join pg_index ix on t.oid = ix.indrelid
3409 join pg_class i on i.oid = ix.indexrelid
3410 left outer join
3411 pg_attribute a
3412 on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
3413 left outer join
3414 pg_constraint c
3415 on (ix.indrelid = c.conrelid and
3416 ix.indexrelid = c.conindid and
3417 c.contype in ('p', 'u', 'x'))
3418 left outer join
3419 pg_am am
3420 on i.relam = am.oid
3421 WHERE
3422 t.relkind IN ('r', 'v', 'f', 'm', 'p')
3423 and t.oid = :table_oid
3424 and ix.indisprimary = 'f'
3425 ORDER BY
3426 t.relname,
3427 i.relname
3428 """ % (
3429 "ix.indnkeyatts"
3430 if self.server_version_info >= (11, 0)
3431 else "NULL",
3432 )
3434 t = sql.text(IDX_SQL).columns(
3435 relname=sqltypes.Unicode, attname=sqltypes.Unicode
3436 )
3437 c = connection.execute(t, table_oid=table_oid)
3439 indexes = defaultdict(lambda: defaultdict(dict))
3441 sv_idx_name = None
3442 for row in c.fetchall():
3443 (
3444 idx_name,
3445 unique,
3446 expr,
3447 prd,
3448 col,
3449 col_num,
3450 conrelid,
3451 idx_key,
3452 idx_option,
3453 options,
3454 amname,
3455 indnkeyatts,
3456 ) = row
3458 if expr:
3459 if idx_name != sv_idx_name:
3460 util.warn(
3461 "Skipped unsupported reflection of "
3462 "expression-based index %s" % idx_name
3463 )
3464 sv_idx_name = idx_name
3465 continue
3467 if prd and not idx_name == sv_idx_name:
3468 util.warn(
3469 "Predicate of partial index %s ignored during reflection"
3470 % idx_name
3471 )
3472 sv_idx_name = idx_name
3474 has_idx = idx_name in indexes
3475 index = indexes[idx_name]
3476 if col is not None:
3477 index["cols"][col_num] = col
3478 if not has_idx:
3479 idx_keys = idx_key.split()
3480 # "The number of key columns in the index, not counting any
3481 # included columns, which are merely stored and do not
3482 # participate in the index semantics"
3483 if indnkeyatts and idx_keys[indnkeyatts:]:
3484 util.warn(
3485 "INCLUDE columns for covering index %s "
3486 "ignored during reflection" % (idx_name,)
3487 )
3488 idx_keys = idx_keys[:indnkeyatts]
3490 index["key"] = [int(k.strip()) for k in idx_keys]
3492 # (new in pg 8.3)
3493 # "pg_index.indoption" is list of ints, one per column/expr.
3494 # int acts as bitmask: 0x01=DESC, 0x02=NULLSFIRST
3495 sorting = {}
3496 for col_idx, col_flags in enumerate(
3497 (idx_option or "").split()
3498 ):
3499 col_flags = int(col_flags.strip())
3500 col_sorting = ()
3501 # try to set flags only if they differ from PG defaults...
3502 if col_flags & 0x01:
3503 col_sorting += ("desc",)
3504 if not (col_flags & 0x02):
3505 col_sorting += ("nullslast",)
3506 else:
3507 if col_flags & 0x02:
3508 col_sorting += ("nullsfirst",)
3509 if col_sorting:
3510 sorting[col_idx] = col_sorting
3511 if sorting:
3512 index["sorting"] = sorting
3514 index["unique"] = unique
3515 if conrelid is not None:
3516 index["duplicates_constraint"] = idx_name
3517 if options:
3518 index["options"] = dict(
3519 [option.split("=") for option in options]
3520 )
3522 # it *might* be nice to include that this is 'btree' in the
3523 # reflection info. But we don't want an Index object
3524 # to have a ``postgresql_using`` in it that is just the
3525 # default, so for the moment leaving this out.
3526 if amname and amname != "btree":
3527 index["amname"] = amname
3529 result = []
3530 for name, idx in indexes.items():
3531 entry = {
3532 "name": name,
3533 "unique": idx["unique"],
3534 "column_names": [idx["cols"][i] for i in idx["key"]],
3535 }
3536 if "duplicates_constraint" in idx:
3537 entry["duplicates_constraint"] = idx["duplicates_constraint"]
3538 if "sorting" in idx:
3539 entry["column_sorting"] = dict(
3540 (idx["cols"][idx["key"][i]], value)
3541 for i, value in idx["sorting"].items()
3542 )
3543 if "options" in idx:
3544 entry.setdefault("dialect_options", {})[
3545 "postgresql_with"
3546 ] = idx["options"]
3547 if "amname" in idx:
3548 entry.setdefault("dialect_options", {})[
3549 "postgresql_using"
3550 ] = idx["amname"]
3551 result.append(entry)
3552 return result
3554 @reflection.cache
3555 def get_unique_constraints(
3556 self, connection, table_name, schema=None, **kw
3557 ):
3558 table_oid = self.get_table_oid(
3559 connection, table_name, schema, info_cache=kw.get("info_cache")
3560 )
3562 UNIQUE_SQL = """
3563 SELECT
3564 cons.conname as name,
3565 cons.conkey as key,
3566 a.attnum as col_num,
3567 a.attname as col_name
3568 FROM
3569 pg_catalog.pg_constraint cons
3570 join pg_attribute a
3571 on cons.conrelid = a.attrelid AND
3572 a.attnum = ANY(cons.conkey)
3573 WHERE
3574 cons.conrelid = :table_oid AND
3575 cons.contype = 'u'
3576 """
3578 t = sql.text(UNIQUE_SQL).columns(col_name=sqltypes.Unicode)
3579 c = connection.execute(t, table_oid=table_oid)
3581 uniques = defaultdict(lambda: defaultdict(dict))
3582 for row in c.fetchall():
3583 uc = uniques[row.name]
3584 uc["key"] = row.key
3585 uc["cols"][row.col_num] = row.col_name
3587 return [
3588 {"name": name, "column_names": [uc["cols"][i] for i in uc["key"]]}
3589 for name, uc in uniques.items()
3590 ]
3592 @reflection.cache
3593 def get_table_comment(self, connection, table_name, schema=None, **kw):
3594 table_oid = self.get_table_oid(
3595 connection, table_name, schema, info_cache=kw.get("info_cache")
3596 )
3598 COMMENT_SQL = """
3599 SELECT
3600 pgd.description as table_comment
3601 FROM
3602 pg_catalog.pg_description pgd
3603 WHERE
3604 pgd.objsubid = 0 AND
3605 pgd.objoid = :table_oid
3606 """
3608 c = connection.execute(sql.text(COMMENT_SQL), table_oid=table_oid)
3609 return {"text": c.scalar()}
3611 @reflection.cache
3612 def get_check_constraints(self, connection, table_name, schema=None, **kw):
3613 table_oid = self.get_table_oid(
3614 connection, table_name, schema, info_cache=kw.get("info_cache")
3615 )
3617 CHECK_SQL = """
3618 SELECT
3619 cons.conname as name,
3620 pg_get_constraintdef(cons.oid) as src
3621 FROM
3622 pg_catalog.pg_constraint cons
3623 WHERE
3624 cons.conrelid = :table_oid AND
3625 cons.contype = 'c'
3626 """
3628 c = connection.execute(sql.text(CHECK_SQL), table_oid=table_oid)
3630 ret = []
3631 for name, src in c:
3632 # samples:
3633 # "CHECK (((a > 1) AND (a < 5)))"
3634 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
3635 # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
3636 # "CHECK (some_boolean_function(a))"
3637 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
3639 m = re.match(
3640 r"^CHECK *\((.+)\)( NOT VALID)?$", src, flags=re.DOTALL
3641 )
3642 if not m:
3643 util.warn("Could not parse CHECK constraint text: %r" % src)
3644 sqltext = ""
3645 else:
3646 sqltext = re.compile(
3647 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
3648 ).sub(r"\1", m.group(1))
3649 entry = {"name": name, "sqltext": sqltext}
3650 if m and m.group(2):
3651 entry["dialect_options"] = {"not_valid": True}
3653 ret.append(entry)
3654 return ret
3656 def _load_enums(self, connection, schema=None):
3657 schema = schema or self.default_schema_name
3658 if not self.supports_native_enum:
3659 return {}
3661 # Load data types for enums:
3662 SQL_ENUMS = """
3663 SELECT t.typname as "name",
3664 -- no enum defaults in 8.4 at least
3665 -- t.typdefault as "default",
3666 pg_catalog.pg_type_is_visible(t.oid) as "visible",
3667 n.nspname as "schema",
3668 e.enumlabel as "label"
3669 FROM pg_catalog.pg_type t
3670 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
3671 LEFT JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
3672 WHERE t.typtype = 'e'
3673 """
3675 if schema != "*":
3676 SQL_ENUMS += "AND n.nspname = :schema "
3678 # e.oid gives us label order within an enum
3679 SQL_ENUMS += 'ORDER BY "schema", "name", e.oid'
3681 s = sql.text(SQL_ENUMS).columns(
3682 attname=sqltypes.Unicode, label=sqltypes.Unicode
3683 )
3685 if schema != "*":
3686 s = s.bindparams(schema=schema)
3688 c = connection.execute(s)
3690 enums = []
3691 enum_by_name = {}
3692 for enum in c.fetchall():
3693 key = (enum["schema"], enum["name"])
3694 if key in enum_by_name:
3695 enum_by_name[key]["labels"].append(enum["label"])
3696 else:
3697 enum_by_name[key] = enum_rec = {
3698 "name": enum["name"],
3699 "schema": enum["schema"],
3700 "visible": enum["visible"],
3701 "labels": [],
3702 }
3703 if enum["label"] is not None:
3704 enum_rec["labels"].append(enum["label"])
3705 enums.append(enum_rec)
3706 return enums
3708 def _load_domains(self, connection):
3709 # Load data types for domains:
3710 SQL_DOMAINS = """
3711 SELECT t.typname as "name",
3712 pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
3713 not t.typnotnull as "nullable",
3714 t.typdefault as "default",
3715 pg_catalog.pg_type_is_visible(t.oid) as "visible",
3716 n.nspname as "schema"
3717 FROM pg_catalog.pg_type t
3718 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
3719 WHERE t.typtype = 'd'
3720 """
3722 s = sql.text(SQL_DOMAINS).columns(attname=sqltypes.Unicode)
3723 c = connection.execute(s)
3725 domains = {}
3726 for domain in c.fetchall():
3727 # strip (30) from character varying(30)
3728 attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
3729 # 'visible' just means whether or not the domain is in a
3730 # schema that's on the search path -- or not overridden by
3731 # a schema with higher precedence. If it's not visible,
3732 # it will be prefixed with the schema-name when it's used.
3733 if domain["visible"]:
3734 key = (domain["name"],)
3735 else:
3736 key = (domain["schema"], domain["name"])
3738 domains[key] = {
3739 "attype": attype,
3740 "nullable": domain["nullable"],
3741 "default": domain["default"],
3742 }
3744 return domains