Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# mssql/base.py 

2# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: http://www.opensource.org/licenses/mit-license.php 

7""" 

8.. dialect:: mssql 

9 :name: Microsoft SQL Server 

10 

11 

12.. _mssql_identity: 

13 

14Auto Increment Behavior / IDENTITY Columns 

15------------------------------------------ 

16 

17SQL Server provides so-called "auto incrementing" behavior using the 

18``IDENTITY`` construct, which can be placed on any single integer column in a 

19table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement" 

20behavior for an integer primary key column, described at 

21:paramref:`_schema.Column.autoincrement`. This means that by default, 

22the first 

23integer primary key column in a :class:`_schema.Table` 

24will be considered to be the 

25identity column and will generate DDL as such:: 

26 

27 from sqlalchemy import Table, MetaData, Column, Integer 

28 

29 m = MetaData() 

30 t = Table('t', m, 

31 Column('id', Integer, primary_key=True), 

32 Column('x', Integer)) 

33 m.create_all(engine) 

34 

35The above example will generate DDL as: 

36 

37.. sourcecode:: sql 

38 

39 CREATE TABLE t ( 

40 id INTEGER NOT NULL IDENTITY(1,1), 

41 x INTEGER NULL, 

42 PRIMARY KEY (id) 

43 ) 

44 

45For the case where this default generation of ``IDENTITY`` is not desired, 

46specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag, 

47on the first integer primary key column:: 

48 

49 m = MetaData() 

50 t = Table('t', m, 

51 Column('id', Integer, primary_key=True, autoincrement=False), 

52 Column('x', Integer)) 

53 m.create_all(engine) 

54 

55To add the ``IDENTITY`` keyword to a non-primary key column, specify 

56``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired 

57:class:`_schema.Column` object, and ensure that 

58:paramref:`_schema.Column.autoincrement` 

59is set to ``False`` on any integer primary key column:: 

60 

61 m = MetaData() 

62 t = Table('t', m, 

63 Column('id', Integer, primary_key=True, autoincrement=False), 

64 Column('x', Integer, autoincrement=True)) 

65 m.create_all(engine) 

66 

67.. versionchanged:: 1.3 Added ``mssql_identity_start`` and 

68 ``mssql_identity_increment`` parameters to :class:`_schema.Column`. 

69 These replace 

70 the use of the :class:`.Sequence` object in order to specify these values. 

71 

72.. deprecated:: 1.3 

73 

74 The use of :class:`.Sequence` to specify IDENTITY characteristics is 

75 deprecated and will be removed in a future release. Please use 

76 the ``mssql_identity_start`` and ``mssql_identity_increment`` parameters 

77 documented at :ref:`mssql_identity`. 

78 

79.. note:: 

80 

81 There can only be one IDENTITY column on the table. When using 

82 ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not 

83 guard against multiple columns specifying the option simultaneously. The 

84 SQL Server database will instead reject the ``CREATE TABLE`` statement. 

85 

86.. note:: 

87 

88 An INSERT statement which attempts to provide a value for a column that is 

89 marked with IDENTITY will be rejected by SQL Server. In order for the 

90 value to be accepted, a session-level option "SET IDENTITY_INSERT" must be 

91 enabled. The SQLAlchemy SQL Server dialect will perform this operation 

92 automatically when using a core :class:`_expression.Insert` 

93 construct; if the 

94 execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT" 

95 option will be enabled for the span of that statement's invocation.However, 

96 this scenario is not high performing and should not be relied upon for 

97 normal use. If a table doesn't actually require IDENTITY behavior in its 

98 integer primary key column, the keyword should be disabled when creating 

99 the table by ensuring that ``autoincrement=False`` is set. 

100 

101Controlling "Start" and "Increment" 

102^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

103 

104Specific control over the "start" and "increment" values for 

105the ``IDENTITY`` generator are provided using the 

106``mssql_identity_start`` and ``mssql_identity_increment`` parameters 

107passed to the :class:`_schema.Column` object:: 

108 

109 from sqlalchemy import Table, Integer, Column 

110 

111 test = Table( 

112 'test', metadata, 

113 Column( 

114 'id', Integer, primary_key=True, mssql_identity_start=100, 

115 mssql_identity_increment=10 

116 ), 

117 Column('name', String(20)) 

118 ) 

119 

120The CREATE TABLE for the above :class:`_schema.Table` object would be: 

121 

122.. sourcecode:: sql 

123 

124 CREATE TABLE test ( 

125 id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY, 

126 name VARCHAR(20) NULL, 

127 ) 

128 

129.. versionchanged:: 1.3 The ``mssql_identity_start`` and 

130 ``mssql_identity_increment`` parameters are now used to affect the 

131 ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server. 

132 Previously, the :class:`.Sequence` object was used. As SQL Server now 

133 supports real sequences as a separate construct, :class:`.Sequence` will be 

134 functional in the normal way in a future SQLAlchemy version. 

135 

136INSERT behavior 

137^^^^^^^^^^^^^^^^ 

138 

139Handling of the ``IDENTITY`` column at INSERT time involves two key 

140techniques. The most common is being able to fetch the "last inserted value" 

141for a given ``IDENTITY`` column, a process which SQLAlchemy performs 

142implicitly in many cases, most importantly within the ORM. 

143 

144The process for fetching this value has several variants: 

145 

146* In the vast majority of cases, RETURNING is used in conjunction with INSERT 

147 statements on SQL Server in order to get newly generated primary key values: 

148 

149 .. sourcecode:: sql 

150 

151 INSERT INTO t (x) OUTPUT inserted.id VALUES (?) 

152 

153* When RETURNING is not available or has been disabled via 

154 ``implicit_returning=False``, either the ``scope_identity()`` function or 

155 the ``@@identity`` variable is used; behavior varies by backend: 

156 

157 * when using PyODBC, the phrase ``; select scope_identity()`` will be 

158 appended to the end of the INSERT statement; a second result set will be 

159 fetched in order to receive the value. Given a table as:: 

160 

161 t = Table('t', m, Column('id', Integer, primary_key=True), 

162 Column('x', Integer), 

163 implicit_returning=False) 

164 

165 an INSERT will look like: 

166 

167 .. sourcecode:: sql 

168 

169 INSERT INTO t (x) VALUES (?); select scope_identity() 

170 

171 * Other dialects such as pymssql will call upon 

172 ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT 

173 statement. If the flag ``use_scope_identity=False`` is passed to 

174 :func:`_sa.create_engine`, 

175 the statement ``SELECT @@identity AS lastrowid`` 

176 is used instead. 

177 

178A table that contains an ``IDENTITY`` column will prohibit an INSERT statement 

179that refers to the identity column explicitly. The SQLAlchemy dialect will 

180detect when an INSERT construct, created using a core 

181:func:`_expression.insert` 

182construct (not a plain string SQL), refers to the identity column, and 

183in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert 

184statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the 

185execution. Given this example:: 

186 

187 m = MetaData() 

188 t = Table('t', m, Column('id', Integer, primary_key=True), 

189 Column('x', Integer)) 

190 m.create_all(engine) 

191 

192 with engine.begin() as conn: 

193 conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2}) 

194 

195The above column will be created with IDENTITY, however the INSERT statement 

196we emit is specifying explicit values. In the echo output we can see 

197how SQLAlchemy handles this: 

198 

199.. sourcecode:: sql 

200 

201 CREATE TABLE t ( 

202 id INTEGER NOT NULL IDENTITY(1,1), 

203 x INTEGER NULL, 

204 PRIMARY KEY (id) 

205 ) 

206 

207 COMMIT 

208 SET IDENTITY_INSERT t ON 

209 INSERT INTO t (id, x) VALUES (?, ?) 

210 ((1, 1), (2, 2)) 

211 SET IDENTITY_INSERT t OFF 

212 COMMIT 

213 

214 

215 

216This 

217is an auxiliary use case suitable for testing and bulk insert scenarios. 

218 

219MAX on VARCHAR / NVARCHAR 

220------------------------- 

221 

222SQL Server supports the special string "MAX" within the 

223:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes, 

224to indicate "maximum length possible". The dialect currently handles this as 

225a length of "None" in the base type, rather than supplying a 

226dialect-specific version of these types, so that a base type 

227specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on 

228more than one backend without using dialect-specific types. 

229 

230To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None:: 

231 

232 my_table = Table( 

233 'my_table', metadata, 

234 Column('my_data', VARCHAR(None)), 

235 Column('my_n_data', NVARCHAR(None)) 

236 ) 

237 

238 

239Collation Support 

240----------------- 

241 

242Character collations are supported by the base string types, 

243specified by the string argument "collation":: 

244 

245 from sqlalchemy import VARCHAR 

246 Column('login', VARCHAR(32, collation='Latin1_General_CI_AS')) 

247 

248When such a column is associated with a :class:`_schema.Table`, the 

249CREATE TABLE statement for this column will yield:: 

250 

251 login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL 

252 

253LIMIT/OFFSET Support 

254-------------------- 

255 

256MSSQL has no support for the LIMIT or OFFSET keywords. LIMIT is 

257supported directly through the ``TOP`` Transact SQL keyword:: 

258 

259 select.limit 

260 

261will yield:: 

262 

263 SELECT TOP n 

264 

265If using SQL Server 2005 or above, LIMIT with OFFSET 

266support is available through the ``ROW_NUMBER OVER`` construct. 

267For versions below 2005, LIMIT with OFFSET usage will fail. 

268 

269.. _mssql_isolation_level: 

270 

271Transaction Isolation Level 

272--------------------------- 

273 

274All SQL Server dialects support setting of transaction isolation level 

275both via a dialect-specific parameter 

276:paramref:`_sa.create_engine.isolation_level` 

277accepted by :func:`_sa.create_engine`, 

278as well as the :paramref:`.Connection.execution_options.isolation_level` 

279argument as passed to 

280:meth:`_engine.Connection.execution_options`. 

281This feature works by issuing the 

282command ``SET TRANSACTION ISOLATION LEVEL <level>`` for 

283each new connection. 

284 

285To set isolation level using :func:`_sa.create_engine`:: 

286 

287 engine = create_engine( 

288 "mssql+pyodbc://scott:tiger@ms_2008", 

289 isolation_level="REPEATABLE READ" 

290 ) 

291 

292To set using per-connection execution options:: 

293 

294 connection = engine.connect() 

295 connection = connection.execution_options( 

296 isolation_level="READ COMMITTED" 

297 ) 

298 

299Valid values for ``isolation_level`` include: 

300 

301* ``AUTOCOMMIT`` - pyodbc / pymssql-specific 

302* ``READ COMMITTED`` 

303* ``READ UNCOMMITTED`` 

304* ``REPEATABLE READ`` 

305* ``SERIALIZABLE`` 

306* ``SNAPSHOT`` - specific to SQL Server 

307 

308.. versionadded:: 1.1 support for isolation level setting on Microsoft 

309 SQL Server. 

310 

311.. versionadded:: 1.2 added AUTOCOMMIT isolation level setting 

312 

313Nullability 

314----------- 

315MSSQL has support for three levels of column nullability. The default 

316nullability allows nulls and is explicit in the CREATE TABLE 

317construct:: 

318 

319 name VARCHAR(20) NULL 

320 

321If ``nullable=None`` is specified then no specification is made. In 

322other words the database's configured default is used. This will 

323render:: 

324 

325 name VARCHAR(20) 

326 

327If ``nullable`` is ``True`` or ``False`` then the column will be 

328``NULL`` or ``NOT NULL`` respectively. 

329 

330Date / Time Handling 

331-------------------- 

332DATE and TIME are supported. Bind parameters are converted 

333to datetime.datetime() objects as required by most MSSQL drivers, 

334and results are processed from strings if needed. 

335The DATE and TIME types are not available for MSSQL 2005 and 

336previous - if a server version below 2008 is detected, DDL 

337for these types will be issued as DATETIME. 

338 

339.. _mssql_large_type_deprecation: 

340 

341Large Text/Binary Type Deprecation 

342---------------------------------- 

343 

344Per 

345`SQL Server 2012/2014 Documentation <http://technet.microsoft.com/en-us/library/ms187993.aspx>`_, 

346the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL 

347Server in a future release. SQLAlchemy normally relates these types to the 

348:class:`.UnicodeText`, :class:`_expression.TextClause` and 

349:class:`.LargeBinary` datatypes. 

350 

351In order to accommodate this change, a new flag ``deprecate_large_types`` 

352is added to the dialect, which will be automatically set based on detection 

353of the server version in use, if not otherwise set by the user. The 

354behavior of this flag is as follows: 

355 

356* When this flag is ``True``, the :class:`.UnicodeText`, 

357 :class:`_expression.TextClause` and 

358 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

359 types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``, 

360 respectively. This is a new behavior as of the addition of this flag. 

361 

362* When this flag is ``False``, the :class:`.UnicodeText`, 

363 :class:`_expression.TextClause` and 

364 :class:`.LargeBinary` datatypes, when used to render DDL, will render the 

365 types ``NTEXT``, ``TEXT``, and ``IMAGE``, 

366 respectively. This is the long-standing behavior of these types. 

367 

368* The flag begins with the value ``None``, before a database connection is 

369 established. If the dialect is used to render DDL without the flag being 

370 set, it is interpreted the same as ``False``. 

371 

372* On first connection, the dialect detects if SQL Server version 2012 or 

373 greater is in use; if the flag is still at ``None``, it sets it to ``True`` 

374 or ``False`` based on whether 2012 or greater is detected. 

375 

376* The flag can be set to either ``True`` or ``False`` when the dialect 

377 is created, typically via :func:`_sa.create_engine`:: 

378 

379 eng = create_engine("mssql+pymssql://user:pass@host/db", 

380 deprecate_large_types=True) 

381 

382* Complete control over whether the "old" or "new" types are rendered is 

383 available in all SQLAlchemy versions by using the UPPERCASE type objects 

384 instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`, 

385 :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`, 

386 :class:`_mssql.IMAGE` 

387 will always remain fixed and always output exactly that 

388 type. 

389 

390.. versionadded:: 1.0.0 

391 

392.. _multipart_schema_names: 

393 

394Multipart Schema Names 

395---------------------- 

396 

397SQL Server schemas sometimes require multiple parts to their "schema" 

398qualifier, that is, including the database name and owner name as separate 

399tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set 

400at once using the :paramref:`_schema.Table.schema` argument of 

401:class:`_schema.Table`:: 

402 

403 Table( 

404 "some_table", metadata, 

405 Column("q", String(50)), 

406 schema="mydatabase.dbo" 

407 ) 

408 

409When performing operations such as table or component reflection, a schema 

410argument that contains a dot will be split into separate 

411"database" and "owner" components in order to correctly query the SQL 

412Server information schema tables, as these two values are stored separately. 

413Additionally, when rendering the schema name for DDL or SQL, the two 

414components will be quoted separately for case sensitive names and other 

415special characters. Given an argument as below:: 

416 

417 Table( 

418 "some_table", metadata, 

419 Column("q", String(50)), 

420 schema="MyDataBase.dbo" 

421 ) 

422 

423The above schema would be rendered as ``[MyDataBase].dbo``, and also in 

424reflection, would be reflected using "dbo" as the owner and "MyDataBase" 

425as the database name. 

426 

427To control how the schema name is broken into database / owner, 

428specify brackets (which in SQL Server are quoting characters) in the name. 

429Below, the "owner" will be considered as ``MyDataBase.dbo`` and the 

430"database" will be None:: 

431 

432 Table( 

433 "some_table", metadata, 

434 Column("q", String(50)), 

435 schema="[MyDataBase.dbo]" 

436 ) 

437 

438To individually specify both database and owner name with special characters 

439or embedded dots, use two sets of brackets:: 

440 

441 Table( 

442 "some_table", metadata, 

443 Column("q", String(50)), 

444 schema="[MyDataBase.Period].[MyOwner.Dot]" 

445 ) 

446 

447 

448.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as 

449 identifier delimeters splitting the schema into separate database 

450 and owner tokens, to allow dots within either name itself. 

451 

452.. _legacy_schema_rendering: 

453 

454Legacy Schema Mode 

455------------------ 

456 

457Very old versions of the MSSQL dialect introduced the behavior such that a 

458schema-qualified table would be auto-aliased when used in a 

459SELECT statement; given a table:: 

460 

461 account_table = Table( 

462 'account', metadata, 

463 Column('id', Integer, primary_key=True), 

464 Column('info', String(100)), 

465 schema="customer_schema" 

466 ) 

467 

468this legacy mode of rendering would assume that "customer_schema.account" 

469would not be accepted by all parts of the SQL statement, as illustrated 

470below:: 

471 

472 >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True) 

473 >>> print(account_table.select().compile(eng)) 

474 SELECT account_1.id, account_1.info 

475 FROM customer_schema.account AS account_1 

476 

477This mode of behavior is now off by default, as it appears to have served 

478no purpose; however in the case that legacy applications rely upon it, 

479it is available using the ``legacy_schema_aliasing`` argument to 

480:func:`_sa.create_engine` as illustrated above. 

481 

482.. versionchanged:: 1.1 the ``legacy_schema_aliasing`` flag introduced 

483 in version 1.0.5 to allow disabling of legacy mode for schemas now 

484 defaults to False. 

485 

486 

487.. _mssql_indexes: 

488 

489Clustered Index Support 

490----------------------- 

491 

492The MSSQL dialect supports clustered indexes (and primary keys) via the 

493``mssql_clustered`` option. This option is available to :class:`.Index`, 

494:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`. 

495 

496To generate a clustered index:: 

497 

498 Index("my_index", table.c.x, mssql_clustered=True) 

499 

500which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``. 

501 

502To generate a clustered primary key use:: 

503 

504 Table('my_table', metadata, 

505 Column('x', ...), 

506 Column('y', ...), 

507 PrimaryKeyConstraint("x", "y", mssql_clustered=True)) 

508 

509which will render the table, for example, as:: 

510 

511 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, 

512 PRIMARY KEY CLUSTERED (x, y)) 

513 

514Similarly, we can generate a clustered unique constraint using:: 

515 

516 Table('my_table', metadata, 

517 Column('x', ...), 

518 Column('y', ...), 

519 PrimaryKeyConstraint("x"), 

520 UniqueConstraint("y", mssql_clustered=True), 

521 ) 

522 

523To explicitly request a non-clustered primary key (for example, when 

524a separate clustered index is desired), use:: 

525 

526 Table('my_table', metadata, 

527 Column('x', ...), 

528 Column('y', ...), 

529 PrimaryKeyConstraint("x", "y", mssql_clustered=False)) 

530 

531which will render the table, for example, as:: 

532 

533 CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, 

534 PRIMARY KEY NONCLUSTERED (x, y)) 

535 

536.. versionchanged:: 1.1 the ``mssql_clustered`` option now defaults 

537 to None, rather than False. ``mssql_clustered=False`` now explicitly 

538 renders the NONCLUSTERED clause, whereas None omits the CLUSTERED 

539 clause entirely, allowing SQL Server defaults to take effect. 

540 

541 

542MSSQL-Specific Index Options 

543----------------------------- 

544 

545In addition to clustering, the MSSQL dialect supports other special options 

546for :class:`.Index`. 

547 

548INCLUDE 

549^^^^^^^ 

550 

551The ``mssql_include`` option renders INCLUDE(colname) for the given string 

552names:: 

553 

554 Index("my_index", table.c.x, mssql_include=['y']) 

555 

556would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

557 

558.. _mssql_index_where: 

559 

560Filtered Indexes 

561^^^^^^^^^^^^^^^^ 

562 

563The ``mssql_where`` option renders WHERE(condition) for the given string 

564names:: 

565 

566 Index("my_index", table.c.x, mssql_where=table.c.x > 10) 

567 

568would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``. 

569 

570.. versionadded:: 1.3.4 

571 

572Index ordering 

573^^^^^^^^^^^^^^ 

574 

575Index ordering is available via functional expressions, such as:: 

576 

577 Index("my_index", table.c.x.desc()) 

578 

579would render the index as ``CREATE INDEX my_index ON table (x DESC)`` 

580 

581.. seealso:: 

582 

583 :ref:`schema_indexes_functional` 

584 

585Compatibility Levels 

586-------------------- 

587MSSQL supports the notion of setting compatibility levels at the 

588database level. This allows, for instance, to run a database that 

589is compatible with SQL2000 while running on a SQL2005 database 

590server. ``server_version_info`` will always return the database 

591server version information (in this case SQL2005) and not the 

592compatibility level information. Because of this, if running under 

593a backwards compatibility mode SQLAlchemy may attempt to use T-SQL 

594statements that are unable to be parsed by the database server. 

595 

596Triggers 

597-------- 

598 

599SQLAlchemy by default uses OUTPUT INSERTED to get at newly 

600generated primary key values via IDENTITY columns or other 

601server side defaults. MS-SQL does not 

602allow the usage of OUTPUT INSERTED on tables that have triggers. 

603To disable the usage of OUTPUT INSERTED on a per-table basis, 

604specify ``implicit_returning=False`` for each :class:`_schema.Table` 

605which has triggers:: 

606 

607 Table('mytable', metadata, 

608 Column('id', Integer, primary_key=True), 

609 # ..., 

610 implicit_returning=False 

611 ) 

612 

613Declarative form:: 

614 

615 class MyClass(Base): 

616 # ... 

617 __table_args__ = {'implicit_returning':False} 

618 

619 

620This option can also be specified engine-wide using the 

621``implicit_returning=False`` argument on :func:`_sa.create_engine`. 

622 

623.. _mssql_rowcount_versioning: 

624 

625Rowcount Support / ORM Versioning 

626--------------------------------- 

627 

628The SQL Server drivers may have limited ability to return the number 

629of rows updated from an UPDATE or DELETE statement. 

630 

631As of this writing, the PyODBC driver is not able to return a rowcount when 

632OUTPUT INSERTED is used. This impacts the SQLAlchemy ORM's versioning feature 

633in many cases where server-side value generators are in use in that while the 

634versioning operations can succeed, the ORM cannot always check that an UPDATE 

635or DELETE statement matched the number of rows expected, which is how it 

636verifies that the version identifier matched. When this condition occurs, a 

637warning will be emitted but the operation will proceed. 

638 

639The use of OUTPUT INSERTED can be disabled by setting the 

640:paramref:`_schema.Table.implicit_returning` flag to ``False`` on a particular 

641:class:`_schema.Table`, which in declarative looks like:: 

642 

643 class MyTable(Base): 

644 __tablename__ = 'mytable' 

645 id = Column(Integer, primary_key=True) 

646 stuff = Column(String(10)) 

647 timestamp = Column(TIMESTAMP(), default=text('DEFAULT')) 

648 __mapper_args__ = { 

649 'version_id_col': timestamp, 

650 'version_id_generator': False, 

651 } 

652 __table_args__ = { 

653 'implicit_returning': False 

654 } 

655 

656Enabling Snapshot Isolation 

657--------------------------- 

658 

659SQL Server has a default transaction 

660isolation mode that locks entire tables, and causes even mildly concurrent 

661applications to have long held locks and frequent deadlocks. 

662Enabling snapshot isolation for the database as a whole is recommended 

663for modern levels of concurrency support. This is accomplished via the 

664following ALTER DATABASE commands executed at the SQL prompt:: 

665 

666 ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON 

667 

668 ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON 

669 

670Background on SQL Server snapshot isolation is available at 

671http://msdn.microsoft.com/en-us/library/ms175095.aspx. 

672 

673""" # noqa 

674 

675import codecs 

676import datetime 

677import operator 

678import re 

679 

680from . import information_schema as ischema 

681from ... import engine 

682from ... import exc 

683from ... import schema as sa_schema 

684from ... import sql 

685from ... import types as sqltypes 

686from ... import util 

687from ...engine import default 

688from ...engine import reflection 

689from ...sql import compiler 

690from ...sql import expression 

691from ...sql import func 

692from ...sql import quoted_name 

693from ...sql import util as sql_util 

694from ...types import BIGINT 

695from ...types import BINARY 

696from ...types import CHAR 

697from ...types import DATE 

698from ...types import DATETIME 

699from ...types import DECIMAL 

700from ...types import FLOAT 

701from ...types import INTEGER 

702from ...types import NCHAR 

703from ...types import NUMERIC 

704from ...types import NVARCHAR 

705from ...types import SMALLINT 

706from ...types import TEXT 

707from ...types import VARCHAR 

708from ...util import update_wrapper 

709from ...util.langhelpers import public_factory 

710 

711 

712# http://sqlserverbuilds.blogspot.com/ 

713MS_2016_VERSION = (13,) 

714MS_2014_VERSION = (12,) 

715MS_2012_VERSION = (11,) 

716MS_2008_VERSION = (10,) 

717MS_2005_VERSION = (9,) 

718MS_2000_VERSION = (8,) 

719 

720RESERVED_WORDS = set( 

721 [ 

722 "add", 

723 "all", 

724 "alter", 

725 "and", 

726 "any", 

727 "as", 

728 "asc", 

729 "authorization", 

730 "backup", 

731 "begin", 

732 "between", 

733 "break", 

734 "browse", 

735 "bulk", 

736 "by", 

737 "cascade", 

738 "case", 

739 "check", 

740 "checkpoint", 

741 "close", 

742 "clustered", 

743 "coalesce", 

744 "collate", 

745 "column", 

746 "commit", 

747 "compute", 

748 "constraint", 

749 "contains", 

750 "containstable", 

751 "continue", 

752 "convert", 

753 "create", 

754 "cross", 

755 "current", 

756 "current_date", 

757 "current_time", 

758 "current_timestamp", 

759 "current_user", 

760 "cursor", 

761 "database", 

762 "dbcc", 

763 "deallocate", 

764 "declare", 

765 "default", 

766 "delete", 

767 "deny", 

768 "desc", 

769 "disk", 

770 "distinct", 

771 "distributed", 

772 "double", 

773 "drop", 

774 "dump", 

775 "else", 

776 "end", 

777 "errlvl", 

778 "escape", 

779 "except", 

780 "exec", 

781 "execute", 

782 "exists", 

783 "exit", 

784 "external", 

785 "fetch", 

786 "file", 

787 "fillfactor", 

788 "for", 

789 "foreign", 

790 "freetext", 

791 "freetexttable", 

792 "from", 

793 "full", 

794 "function", 

795 "goto", 

796 "grant", 

797 "group", 

798 "having", 

799 "holdlock", 

800 "identity", 

801 "identity_insert", 

802 "identitycol", 

803 "if", 

804 "in", 

805 "index", 

806 "inner", 

807 "insert", 

808 "intersect", 

809 "into", 

810 "is", 

811 "join", 

812 "key", 

813 "kill", 

814 "left", 

815 "like", 

816 "lineno", 

817 "load", 

818 "merge", 

819 "national", 

820 "nocheck", 

821 "nonclustered", 

822 "not", 

823 "null", 

824 "nullif", 

825 "of", 

826 "off", 

827 "offsets", 

828 "on", 

829 "open", 

830 "opendatasource", 

831 "openquery", 

832 "openrowset", 

833 "openxml", 

834 "option", 

835 "or", 

836 "order", 

837 "outer", 

838 "over", 

839 "percent", 

840 "pivot", 

841 "plan", 

842 "precision", 

843 "primary", 

844 "print", 

845 "proc", 

846 "procedure", 

847 "public", 

848 "raiserror", 

849 "read", 

850 "readtext", 

851 "reconfigure", 

852 "references", 

853 "replication", 

854 "restore", 

855 "restrict", 

856 "return", 

857 "revert", 

858 "revoke", 

859 "right", 

860 "rollback", 

861 "rowcount", 

862 "rowguidcol", 

863 "rule", 

864 "save", 

865 "schema", 

866 "securityaudit", 

867 "select", 

868 "session_user", 

869 "set", 

870 "setuser", 

871 "shutdown", 

872 "some", 

873 "statistics", 

874 "system_user", 

875 "table", 

876 "tablesample", 

877 "textsize", 

878 "then", 

879 "to", 

880 "top", 

881 "tran", 

882 "transaction", 

883 "trigger", 

884 "truncate", 

885 "tsequal", 

886 "union", 

887 "unique", 

888 "unpivot", 

889 "update", 

890 "updatetext", 

891 "use", 

892 "user", 

893 "values", 

894 "varying", 

895 "view", 

896 "waitfor", 

897 "when", 

898 "where", 

899 "while", 

900 "with", 

901 "writetext", 

902 ] 

903) 

904 

905 

906class REAL(sqltypes.REAL): 

907 __visit_name__ = "REAL" 

908 

909 def __init__(self, **kw): 

910 # REAL is a synonym for FLOAT(24) on SQL server. 

911 # it is only accepted as the word "REAL" in DDL, the numeric 

912 # precision value is not allowed to be present 

913 kw.setdefault("precision", 24) 

914 super(REAL, self).__init__(**kw) 

915 

916 

917class TINYINT(sqltypes.Integer): 

918 __visit_name__ = "TINYINT" 

919 

920 

921# MSSQL DATE/TIME types have varied behavior, sometimes returning 

922# strings. MSDate/TIME check for everything, and always 

923# filter bind parameters into datetime objects (required by pyodbc, 

924# not sure about other dialects). 

925 

926 

927class _MSDate(sqltypes.Date): 

928 def bind_processor(self, dialect): 

929 def process(value): 

930 if type(value) == datetime.date: 

931 return datetime.datetime(value.year, value.month, value.day) 

932 else: 

933 return value 

934 

935 return process 

936 

937 _reg = re.compile(r"(\d+)-(\d+)-(\d+)") 

938 

939 def result_processor(self, dialect, coltype): 

940 def process(value): 

941 if isinstance(value, datetime.datetime): 

942 return value.date() 

943 elif isinstance(value, util.string_types): 

944 m = self._reg.match(value) 

945 if not m: 

946 raise ValueError( 

947 "could not parse %r as a date value" % (value,) 

948 ) 

949 return datetime.date(*[int(x or 0) for x in m.groups()]) 

950 else: 

951 return value 

952 

953 return process 

954 

955 

956class TIME(sqltypes.TIME): 

957 def __init__(self, precision=None, **kwargs): 

958 self.precision = precision 

959 super(TIME, self).__init__() 

960 

961 __zero_date = datetime.date(1900, 1, 1) 

962 

963 def bind_processor(self, dialect): 

964 def process(value): 

965 if isinstance(value, datetime.datetime): 

966 value = datetime.datetime.combine( 

967 self.__zero_date, value.time() 

968 ) 

969 elif isinstance(value, datetime.time): 

970 """ issue #5339 

971 per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns 

972 pass TIME value as string 

973 """ # noqa 

974 value = str(value) 

975 return value 

976 

977 return process 

978 

979 _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?") 

980 

981 def result_processor(self, dialect, coltype): 

982 def process(value): 

983 if isinstance(value, datetime.datetime): 

984 return value.time() 

985 elif isinstance(value, util.string_types): 

986 m = self._reg.match(value) 

987 if not m: 

988 raise ValueError( 

989 "could not parse %r as a time value" % (value,) 

990 ) 

991 return datetime.time(*[int(x or 0) for x in m.groups()]) 

992 else: 

993 return value 

994 

995 return process 

996 

997 

998_MSTime = TIME 

999 

1000 

1001class _DateTimeBase(object): 

1002 def bind_processor(self, dialect): 

1003 def process(value): 

1004 if type(value) == datetime.date: 

1005 return datetime.datetime(value.year, value.month, value.day) 

1006 else: 

1007 return value 

1008 

1009 return process 

1010 

1011 

1012class _MSDateTime(_DateTimeBase, sqltypes.DateTime): 

1013 pass 

1014 

1015 

1016class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime): 

1017 __visit_name__ = "SMALLDATETIME" 

1018 

1019 

1020class DATETIME2(_DateTimeBase, sqltypes.DateTime): 

1021 __visit_name__ = "DATETIME2" 

1022 

1023 def __init__(self, precision=None, **kw): 

1024 super(DATETIME2, self).__init__(**kw) 

1025 self.precision = precision 

1026 

1027 

1028# TODO: is this not an Interval ? 

1029class DATETIMEOFFSET(sqltypes.TypeEngine): 

1030 __visit_name__ = "DATETIMEOFFSET" 

1031 

1032 def __init__(self, precision=None, **kwargs): 

1033 self.precision = precision 

1034 

1035 

1036class _UnicodeLiteral(object): 

1037 def literal_processor(self, dialect): 

1038 def process(value): 

1039 

1040 value = value.replace("'", "''") 

1041 

1042 if dialect.identifier_preparer._double_percents: 

1043 value = value.replace("%", "%%") 

1044 

1045 return "N'%s'" % value 

1046 

1047 return process 

1048 

1049 

1050class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode): 

1051 pass 

1052 

1053 

1054class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText): 

1055 pass 

1056 

1057 

1058class TIMESTAMP(sqltypes._Binary): 

1059 """Implement the SQL Server TIMESTAMP type. 

1060 

1061 Note this is **completely different** than the SQL Standard 

1062 TIMESTAMP type, which is not supported by SQL Server. It 

1063 is a read-only datatype that does not support INSERT of values. 

1064 

1065 .. versionadded:: 1.2 

1066 

1067 .. seealso:: 

1068 

1069 :class:`_mssql.ROWVERSION` 

1070 

1071 """ 

1072 

1073 __visit_name__ = "TIMESTAMP" 

1074 

1075 # expected by _Binary to be present 

1076 length = None 

1077 

1078 def __init__(self, convert_int=False): 

1079 """Construct a TIMESTAMP or ROWVERSION type. 

1080 

1081 :param convert_int: if True, binary integer values will 

1082 be converted to integers on read. 

1083 

1084 .. versionadded:: 1.2 

1085 

1086 """ 

1087 self.convert_int = convert_int 

1088 

1089 def result_processor(self, dialect, coltype): 

1090 super_ = super(TIMESTAMP, self).result_processor(dialect, coltype) 

1091 if self.convert_int: 

1092 

1093 def process(value): 

1094 value = super_(value) 

1095 if value is not None: 

1096 # https://stackoverflow.com/a/30403242/34549 

1097 value = int(codecs.encode(value, "hex"), 16) 

1098 return value 

1099 

1100 return process 

1101 else: 

1102 return super_ 

1103 

1104 

1105class ROWVERSION(TIMESTAMP): 

1106 """Implement the SQL Server ROWVERSION type. 

1107 

1108 The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP 

1109 datatype, however current SQL Server documentation suggests using 

1110 ROWVERSION for new datatypes going forward. 

1111 

1112 The ROWVERSION datatype does **not** reflect (e.g. introspect) from the 

1113 database as itself; the returned datatype will be 

1114 :class:`_mssql.TIMESTAMP`. 

1115 

1116 This is a read-only datatype that does not support INSERT of values. 

1117 

1118 .. versionadded:: 1.2 

1119 

1120 .. seealso:: 

1121 

1122 :class:`_mssql.TIMESTAMP` 

1123 

1124 """ 

1125 

1126 __visit_name__ = "ROWVERSION" 

1127 

1128 

1129class NTEXT(sqltypes.UnicodeText): 

1130 

1131 """MSSQL NTEXT type, for variable-length unicode text up to 2^30 

1132 characters.""" 

1133 

1134 __visit_name__ = "NTEXT" 

1135 

1136 

1137class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary): 

1138 """The MSSQL VARBINARY type. 

1139 

1140 This type is present to support "deprecate_large_types" mode where 

1141 either ``VARBINARY(max)`` or IMAGE is rendered. Otherwise, this type 

1142 object is redundant vs. :class:`_types.VARBINARY`. 

1143 

1144 .. versionadded:: 1.0.0 

1145 

1146 .. seealso:: 

1147 

1148 :ref:`mssql_large_type_deprecation` 

1149 

1150 

1151 

1152 """ 

1153 

1154 __visit_name__ = "VARBINARY" 

1155 

1156 

1157class IMAGE(sqltypes.LargeBinary): 

1158 __visit_name__ = "IMAGE" 

1159 

1160 

1161class XML(sqltypes.Text): 

1162 """MSSQL XML type. 

1163 

1164 This is a placeholder type for reflection purposes that does not include 

1165 any Python-side datatype support. It also does not currently support 

1166 additional arguments, such as "CONTENT", "DOCUMENT", 

1167 "xml_schema_collection". 

1168 

1169 .. versionadded:: 1.1.11 

1170 

1171 """ 

1172 

1173 __visit_name__ = "XML" 

1174 

1175 

1176class BIT(sqltypes.TypeEngine): 

1177 __visit_name__ = "BIT" 

1178 

1179 

1180class MONEY(sqltypes.TypeEngine): 

1181 __visit_name__ = "MONEY" 

1182 

1183 

1184class SMALLMONEY(sqltypes.TypeEngine): 

1185 __visit_name__ = "SMALLMONEY" 

1186 

1187 

1188class UNIQUEIDENTIFIER(sqltypes.TypeEngine): 

1189 __visit_name__ = "UNIQUEIDENTIFIER" 

1190 

1191 

1192class SQL_VARIANT(sqltypes.TypeEngine): 

1193 __visit_name__ = "SQL_VARIANT" 

1194 

1195 

1196class TryCast(sql.elements.Cast): 

1197 """Represent a SQL Server TRY_CAST expression. 

1198 

1199 """ 

1200 

1201 __visit_name__ = "try_cast" 

1202 

1203 def __init__(self, *arg, **kw): 

1204 """Create a TRY_CAST expression. 

1205 

1206 :class:`.TryCast` is a subclass of SQLAlchemy's :class:`.Cast` 

1207 construct, and works in the same way, except that the SQL expression 

1208 rendered is "TRY_CAST" rather than "CAST":: 

1209 

1210 from sqlalchemy import select 

1211 from sqlalchemy import Numeric 

1212 from sqlalchemy.dialects.mssql import try_cast 

1213 

1214 stmt = select([ 

1215 try_cast(product_table.c.unit_price, Numeric(10, 4)) 

1216 ]) 

1217 

1218 The above would render:: 

1219 

1220 SELECT TRY_CAST (product_table.unit_price AS NUMERIC(10, 4)) 

1221 FROM product_table 

1222 

1223 .. versionadded:: 1.3.7 

1224 

1225 """ 

1226 super(TryCast, self).__init__(*arg, **kw) 

1227 

1228 

1229try_cast = public_factory(TryCast, ".dialects.mssql.try_cast") 

1230 

1231# old names. 

1232MSDateTime = _MSDateTime 

1233MSDate = _MSDate 

1234MSReal = REAL 

1235MSTinyInteger = TINYINT 

1236MSTime = TIME 

1237MSSmallDateTime = SMALLDATETIME 

1238MSDateTime2 = DATETIME2 

1239MSDateTimeOffset = DATETIMEOFFSET 

1240MSText = TEXT 

1241MSNText = NTEXT 

1242MSString = VARCHAR 

1243MSNVarchar = NVARCHAR 

1244MSChar = CHAR 

1245MSNChar = NCHAR 

1246MSBinary = BINARY 

1247MSVarBinary = VARBINARY 

1248MSImage = IMAGE 

1249MSBit = BIT 

1250MSMoney = MONEY 

1251MSSmallMoney = SMALLMONEY 

1252MSUniqueIdentifier = UNIQUEIDENTIFIER 

1253MSVariant = SQL_VARIANT 

1254 

1255ischema_names = { 

1256 "int": INTEGER, 

1257 "bigint": BIGINT, 

1258 "smallint": SMALLINT, 

1259 "tinyint": TINYINT, 

1260 "varchar": VARCHAR, 

1261 "nvarchar": NVARCHAR, 

1262 "char": CHAR, 

1263 "nchar": NCHAR, 

1264 "text": TEXT, 

1265 "ntext": NTEXT, 

1266 "decimal": DECIMAL, 

1267 "numeric": NUMERIC, 

1268 "float": FLOAT, 

1269 "datetime": DATETIME, 

1270 "datetime2": DATETIME2, 

1271 "datetimeoffset": DATETIMEOFFSET, 

1272 "date": DATE, 

1273 "time": TIME, 

1274 "smalldatetime": SMALLDATETIME, 

1275 "binary": BINARY, 

1276 "varbinary": VARBINARY, 

1277 "bit": BIT, 

1278 "real": REAL, 

1279 "image": IMAGE, 

1280 "xml": XML, 

1281 "timestamp": TIMESTAMP, 

1282 "money": MONEY, 

1283 "smallmoney": SMALLMONEY, 

1284 "uniqueidentifier": UNIQUEIDENTIFIER, 

1285 "sql_variant": SQL_VARIANT, 

1286} 

1287 

1288 

1289class MSTypeCompiler(compiler.GenericTypeCompiler): 

1290 def _extend(self, spec, type_, length=None): 

1291 """Extend a string-type declaration with standard SQL 

1292 COLLATE annotations. 

1293 

1294 """ 

1295 

1296 if getattr(type_, "collation", None): 

1297 collation = "COLLATE %s" % type_.collation 

1298 else: 

1299 collation = None 

1300 

1301 if not length: 

1302 length = type_.length 

1303 

1304 if length: 

1305 spec = spec + "(%s)" % length 

1306 

1307 return " ".join([c for c in (spec, collation) if c is not None]) 

1308 

1309 def visit_FLOAT(self, type_, **kw): 

1310 precision = getattr(type_, "precision", None) 

1311 if precision is None: 

1312 return "FLOAT" 

1313 else: 

1314 return "FLOAT(%(precision)s)" % {"precision": precision} 

1315 

1316 def visit_TINYINT(self, type_, **kw): 

1317 return "TINYINT" 

1318 

1319 def visit_DATETIMEOFFSET(self, type_, **kw): 

1320 if type_.precision is not None: 

1321 return "DATETIMEOFFSET(%s)" % type_.precision 

1322 else: 

1323 return "DATETIMEOFFSET" 

1324 

1325 def visit_TIME(self, type_, **kw): 

1326 precision = getattr(type_, "precision", None) 

1327 if precision is not None: 

1328 return "TIME(%s)" % precision 

1329 else: 

1330 return "TIME" 

1331 

1332 def visit_TIMESTAMP(self, type_, **kw): 

1333 return "TIMESTAMP" 

1334 

1335 def visit_ROWVERSION(self, type_, **kw): 

1336 return "ROWVERSION" 

1337 

1338 def visit_DATETIME2(self, type_, **kw): 

1339 precision = getattr(type_, "precision", None) 

1340 if precision is not None: 

1341 return "DATETIME2(%s)" % precision 

1342 else: 

1343 return "DATETIME2" 

1344 

1345 def visit_SMALLDATETIME(self, type_, **kw): 

1346 return "SMALLDATETIME" 

1347 

1348 def visit_unicode(self, type_, **kw): 

1349 return self.visit_NVARCHAR(type_, **kw) 

1350 

1351 def visit_text(self, type_, **kw): 

1352 if self.dialect.deprecate_large_types: 

1353 return self.visit_VARCHAR(type_, **kw) 

1354 else: 

1355 return self.visit_TEXT(type_, **kw) 

1356 

1357 def visit_unicode_text(self, type_, **kw): 

1358 if self.dialect.deprecate_large_types: 

1359 return self.visit_NVARCHAR(type_, **kw) 

1360 else: 

1361 return self.visit_NTEXT(type_, **kw) 

1362 

1363 def visit_NTEXT(self, type_, **kw): 

1364 return self._extend("NTEXT", type_) 

1365 

1366 def visit_TEXT(self, type_, **kw): 

1367 return self._extend("TEXT", type_) 

1368 

1369 def visit_VARCHAR(self, type_, **kw): 

1370 return self._extend("VARCHAR", type_, length=type_.length or "max") 

1371 

1372 def visit_CHAR(self, type_, **kw): 

1373 return self._extend("CHAR", type_) 

1374 

1375 def visit_NCHAR(self, type_, **kw): 

1376 return self._extend("NCHAR", type_) 

1377 

1378 def visit_NVARCHAR(self, type_, **kw): 

1379 return self._extend("NVARCHAR", type_, length=type_.length or "max") 

1380 

1381 def visit_date(self, type_, **kw): 

1382 if self.dialect.server_version_info < MS_2008_VERSION: 

1383 return self.visit_DATETIME(type_, **kw) 

1384 else: 

1385 return self.visit_DATE(type_, **kw) 

1386 

1387 def visit_time(self, type_, **kw): 

1388 if self.dialect.server_version_info < MS_2008_VERSION: 

1389 return self.visit_DATETIME(type_, **kw) 

1390 else: 

1391 return self.visit_TIME(type_, **kw) 

1392 

1393 def visit_large_binary(self, type_, **kw): 

1394 if self.dialect.deprecate_large_types: 

1395 return self.visit_VARBINARY(type_, **kw) 

1396 else: 

1397 return self.visit_IMAGE(type_, **kw) 

1398 

1399 def visit_IMAGE(self, type_, **kw): 

1400 return "IMAGE" 

1401 

1402 def visit_XML(self, type_, **kw): 

1403 return "XML" 

1404 

1405 def visit_VARBINARY(self, type_, **kw): 

1406 return self._extend("VARBINARY", type_, length=type_.length or "max") 

1407 

1408 def visit_boolean(self, type_, **kw): 

1409 return self.visit_BIT(type_) 

1410 

1411 def visit_BIT(self, type_, **kw): 

1412 return "BIT" 

1413 

1414 def visit_MONEY(self, type_, **kw): 

1415 return "MONEY" 

1416 

1417 def visit_SMALLMONEY(self, type_, **kw): 

1418 return "SMALLMONEY" 

1419 

1420 def visit_UNIQUEIDENTIFIER(self, type_, **kw): 

1421 return "UNIQUEIDENTIFIER" 

1422 

1423 def visit_SQL_VARIANT(self, type_, **kw): 

1424 return "SQL_VARIANT" 

1425 

1426 

1427class MSExecutionContext(default.DefaultExecutionContext): 

1428 _enable_identity_insert = False 

1429 _select_lastrowid = False 

1430 _result_proxy = None 

1431 _lastrowid = None 

1432 

1433 def _opt_encode(self, statement): 

1434 if not self.dialect.supports_unicode_statements: 

1435 return self.dialect._encoder(statement)[0] 

1436 else: 

1437 return statement 

1438 

1439 def pre_exec(self): 

1440 """Activate IDENTITY_INSERT if needed.""" 

1441 

1442 if self.isinsert: 

1443 tbl = self.compiled.statement.table 

1444 seq_column = tbl._autoincrement_column 

1445 insert_has_sequence = seq_column is not None 

1446 

1447 if insert_has_sequence: 

1448 self._enable_identity_insert = ( 

1449 seq_column.key in self.compiled_parameters[0] 

1450 ) or ( 

1451 self.compiled.statement.parameters 

1452 and ( 

1453 ( 

1454 self.compiled.statement._has_multi_parameters 

1455 and ( 

1456 seq_column.key 

1457 in self.compiled.statement.parameters[0] 

1458 or seq_column 

1459 in self.compiled.statement.parameters[0] 

1460 ) 

1461 ) 

1462 or ( 

1463 not self.compiled.statement._has_multi_parameters 

1464 and ( 

1465 seq_column.key 

1466 in self.compiled.statement.parameters 

1467 or seq_column 

1468 in self.compiled.statement.parameters 

1469 ) 

1470 ) 

1471 ) 

1472 ) 

1473 else: 

1474 self._enable_identity_insert = False 

1475 

1476 self._select_lastrowid = ( 

1477 not self.compiled.inline 

1478 and insert_has_sequence 

1479 and not self.compiled.returning 

1480 and not self._enable_identity_insert 

1481 and not self.executemany 

1482 ) 

1483 

1484 if self._enable_identity_insert: 

1485 self.root_connection._cursor_execute( 

1486 self.cursor, 

1487 self._opt_encode( 

1488 "SET IDENTITY_INSERT %s ON" 

1489 % self.dialect.identifier_preparer.format_table(tbl) 

1490 ), 

1491 (), 

1492 self, 

1493 ) 

1494 

1495 def post_exec(self): 

1496 """Disable IDENTITY_INSERT if enabled.""" 

1497 

1498 conn = self.root_connection 

1499 if self._select_lastrowid: 

1500 if self.dialect.use_scope_identity: 

1501 conn._cursor_execute( 

1502 self.cursor, 

1503 "SELECT scope_identity() AS lastrowid", 

1504 (), 

1505 self, 

1506 ) 

1507 else: 

1508 conn._cursor_execute( 

1509 self.cursor, "SELECT @@identity AS lastrowid", (), self 

1510 ) 

1511 # fetchall() ensures the cursor is consumed without closing it 

1512 row = self.cursor.fetchall()[0] 

1513 self._lastrowid = int(row[0]) 

1514 

1515 if ( 

1516 self.isinsert or self.isupdate or self.isdelete 

1517 ) and self.compiled.returning: 

1518 self._result_proxy = engine.FullyBufferedResultProxy(self) 

1519 

1520 if self._enable_identity_insert: 

1521 conn._cursor_execute( 

1522 self.cursor, 

1523 self._opt_encode( 

1524 "SET IDENTITY_INSERT %s OFF" 

1525 % self.dialect.identifier_preparer.format_table( 

1526 self.compiled.statement.table 

1527 ) 

1528 ), 

1529 (), 

1530 self, 

1531 ) 

1532 

1533 def get_lastrowid(self): 

1534 return self._lastrowid 

1535 

1536 def handle_dbapi_exception(self, e): 

1537 if self._enable_identity_insert: 

1538 try: 

1539 self.cursor.execute( 

1540 self._opt_encode( 

1541 "SET IDENTITY_INSERT %s OFF" 

1542 % self.dialect.identifier_preparer.format_table( 

1543 self.compiled.statement.table 

1544 ) 

1545 ) 

1546 ) 

1547 except Exception: 

1548 pass 

1549 

1550 def get_result_proxy(self): 

1551 if self._result_proxy: 

1552 return self._result_proxy 

1553 else: 

1554 return engine.ResultProxy(self) 

1555 

1556 

1557class MSSQLCompiler(compiler.SQLCompiler): 

1558 returning_precedes_values = True 

1559 

1560 extract_map = util.update_copy( 

1561 compiler.SQLCompiler.extract_map, 

1562 { 

1563 "doy": "dayofyear", 

1564 "dow": "weekday", 

1565 "milliseconds": "millisecond", 

1566 "microseconds": "microsecond", 

1567 }, 

1568 ) 

1569 

1570 def __init__(self, *args, **kwargs): 

1571 self.tablealiases = {} 

1572 super(MSSQLCompiler, self).__init__(*args, **kwargs) 

1573 

1574 def _with_legacy_schema_aliasing(fn): 

1575 def decorate(self, *arg, **kw): 

1576 if self.dialect.legacy_schema_aliasing: 

1577 return fn(self, *arg, **kw) 

1578 else: 

1579 super_ = getattr(super(MSSQLCompiler, self), fn.__name__) 

1580 return super_(*arg, **kw) 

1581 

1582 return decorate 

1583 

1584 def visit_now_func(self, fn, **kw): 

1585 return "CURRENT_TIMESTAMP" 

1586 

1587 def visit_current_date_func(self, fn, **kw): 

1588 return "GETDATE()" 

1589 

1590 def visit_length_func(self, fn, **kw): 

1591 return "LEN%s" % self.function_argspec(fn, **kw) 

1592 

1593 def visit_char_length_func(self, fn, **kw): 

1594 return "LEN%s" % self.function_argspec(fn, **kw) 

1595 

1596 def visit_concat_op_binary(self, binary, operator, **kw): 

1597 return "%s + %s" % ( 

1598 self.process(binary.left, **kw), 

1599 self.process(binary.right, **kw), 

1600 ) 

1601 

1602 def visit_true(self, expr, **kw): 

1603 return "1" 

1604 

1605 def visit_false(self, expr, **kw): 

1606 return "0" 

1607 

1608 def visit_match_op_binary(self, binary, operator, **kw): 

1609 return "CONTAINS (%s, %s)" % ( 

1610 self.process(binary.left, **kw), 

1611 self.process(binary.right, **kw), 

1612 ) 

1613 

1614 def get_select_precolumns(self, select, **kw): 

1615 """ MS-SQL puts TOP, it's version of LIMIT here """ 

1616 

1617 s = "" 

1618 if select._distinct: 

1619 s += "DISTINCT " 

1620 

1621 if select._simple_int_limit and ( 

1622 select._offset_clause is None 

1623 or (select._simple_int_offset and select._offset == 0) 

1624 ): 

1625 # ODBC drivers and possibly others 

1626 # don't support bind params in the SELECT clause on SQL Server. 

1627 # so have to use literal here. 

1628 s += "TOP %d " % select._limit 

1629 

1630 if s: 

1631 return s 

1632 else: 

1633 return compiler.SQLCompiler.get_select_precolumns( 

1634 self, select, **kw 

1635 ) 

1636 

1637 def get_from_hint_text(self, table, text): 

1638 return text 

1639 

1640 def get_crud_hint_text(self, table, text): 

1641 return text 

1642 

1643 def limit_clause(self, select, **kw): 

1644 # Limit in mssql is after the select keyword 

1645 return "" 

1646 

1647 def visit_try_cast(self, element, **kw): 

1648 return "TRY_CAST (%s AS %s)" % ( 

1649 self.process(element.clause, **kw), 

1650 self.process(element.typeclause, **kw), 

1651 ) 

1652 

1653 def visit_select(self, select, **kwargs): 

1654 """Look for ``LIMIT`` and OFFSET in a select statement, and if 

1655 so tries to wrap it in a subquery with ``row_number()`` criterion. 

1656 

1657 """ 

1658 if ( 

1659 (not select._simple_int_limit and select._limit_clause is not None) 

1660 or ( 

1661 select._offset_clause is not None 

1662 and not select._simple_int_offset 

1663 or select._offset 

1664 ) 

1665 ) and not getattr(select, "_mssql_visit", None): 

1666 

1667 # to use ROW_NUMBER(), an ORDER BY is required. 

1668 if not select._order_by_clause.clauses: 

1669 raise exc.CompileError( 

1670 "MSSQL requires an order_by when " 

1671 "using an OFFSET or a non-simple " 

1672 "LIMIT clause" 

1673 ) 

1674 

1675 _order_by_clauses = [ 

1676 sql_util.unwrap_label_reference(elem) 

1677 for elem in select._order_by_clause.clauses 

1678 ] 

1679 

1680 limit_clause = select._limit_clause 

1681 offset_clause = select._offset_clause 

1682 kwargs["select_wraps_for"] = select 

1683 select = select._generate() 

1684 select._mssql_visit = True 

1685 select = ( 

1686 select.column( 

1687 sql.func.ROW_NUMBER() 

1688 .over(order_by=_order_by_clauses) 

1689 .label("mssql_rn") 

1690 ) 

1691 .order_by(None) 

1692 .alias() 

1693 ) 

1694 

1695 mssql_rn = sql.column("mssql_rn") 

1696 limitselect = sql.select( 

1697 [c for c in select.c if c.key != "mssql_rn"] 

1698 ) 

1699 if offset_clause is not None: 

1700 limitselect.append_whereclause(mssql_rn > offset_clause) 

1701 if limit_clause is not None: 

1702 limitselect.append_whereclause( 

1703 mssql_rn <= (limit_clause + offset_clause) 

1704 ) 

1705 else: 

1706 limitselect.append_whereclause(mssql_rn <= (limit_clause)) 

1707 return self.process(limitselect, **kwargs) 

1708 else: 

1709 return compiler.SQLCompiler.visit_select(self, select, **kwargs) 

1710 

1711 @_with_legacy_schema_aliasing 

1712 def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs): 

1713 if mssql_aliased is table or iscrud: 

1714 return super(MSSQLCompiler, self).visit_table(table, **kwargs) 

1715 

1716 # alias schema-qualified tables 

1717 alias = self._schema_aliased_table(table) 

1718 if alias is not None: 

1719 return self.process(alias, mssql_aliased=table, **kwargs) 

1720 else: 

1721 return super(MSSQLCompiler, self).visit_table(table, **kwargs) 

1722 

1723 @_with_legacy_schema_aliasing 

1724 def visit_alias(self, alias, **kw): 

1725 # translate for schema-qualified table aliases 

1726 kw["mssql_aliased"] = alias.original 

1727 return super(MSSQLCompiler, self).visit_alias(alias, **kw) 

1728 

1729 @_with_legacy_schema_aliasing 

1730 def visit_column(self, column, add_to_result_map=None, **kw): 

1731 if ( 

1732 column.table is not None 

1733 and (not self.isupdate and not self.isdelete) 

1734 or self.is_subquery() 

1735 ): 

1736 # translate for schema-qualified table aliases 

1737 t = self._schema_aliased_table(column.table) 

1738 if t is not None: 

1739 converted = expression._corresponding_column_or_error( 

1740 t, column 

1741 ) 

1742 if add_to_result_map is not None: 

1743 add_to_result_map( 

1744 column.name, 

1745 column.name, 

1746 (column, column.name, column.key), 

1747 column.type, 

1748 ) 

1749 

1750 return super(MSSQLCompiler, self).visit_column(converted, **kw) 

1751 

1752 return super(MSSQLCompiler, self).visit_column( 

1753 column, add_to_result_map=add_to_result_map, **kw 

1754 ) 

1755 

1756 def _schema_aliased_table(self, table): 

1757 if getattr(table, "schema", None) is not None: 

1758 if table not in self.tablealiases: 

1759 self.tablealiases[table] = table.alias() 

1760 return self.tablealiases[table] 

1761 else: 

1762 return None 

1763 

1764 def visit_extract(self, extract, **kw): 

1765 field = self.extract_map.get(extract.field, extract.field) 

1766 return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw)) 

1767 

1768 def visit_savepoint(self, savepoint_stmt): 

1769 return "SAVE TRANSACTION %s" % self.preparer.format_savepoint( 

1770 savepoint_stmt 

1771 ) 

1772 

1773 def visit_rollback_to_savepoint(self, savepoint_stmt): 

1774 return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint( 

1775 savepoint_stmt 

1776 ) 

1777 

1778 def visit_binary(self, binary, **kwargs): 

1779 """Move bind parameters to the right-hand side of an operator, where 

1780 possible. 

1781 

1782 """ 

1783 if ( 

1784 isinstance(binary.left, expression.BindParameter) 

1785 and binary.operator == operator.eq 

1786 and not isinstance(binary.right, expression.BindParameter) 

1787 ): 

1788 return self.process( 

1789 expression.BinaryExpression( 

1790 binary.right, binary.left, binary.operator 

1791 ), 

1792 **kwargs 

1793 ) 

1794 return super(MSSQLCompiler, self).visit_binary(binary, **kwargs) 

1795 

1796 def returning_clause(self, stmt, returning_cols): 

1797 

1798 if self.isinsert or self.isupdate: 

1799 target = stmt.table.alias("inserted") 

1800 else: 

1801 target = stmt.table.alias("deleted") 

1802 

1803 adapter = sql_util.ClauseAdapter(target) 

1804 

1805 columns = [ 

1806 self._label_select_column( 

1807 None, adapter.traverse(c), True, False, {} 

1808 ) 

1809 for c in expression._select_iterables(returning_cols) 

1810 ] 

1811 

1812 return "OUTPUT " + ", ".join(columns) 

1813 

1814 def get_cte_preamble(self, recursive): 

1815 # SQL Server finds it too inconvenient to accept 

1816 # an entirely optional, SQL standard specified, 

1817 # "RECURSIVE" word with their "WITH", 

1818 # so here we go 

1819 return "WITH" 

1820 

1821 def label_select_column(self, select, column, asfrom): 

1822 if isinstance(column, expression.Function): 

1823 return column.label(None) 

1824 else: 

1825 return super(MSSQLCompiler, self).label_select_column( 

1826 select, column, asfrom 

1827 ) 

1828 

1829 def for_update_clause(self, select): 

1830 # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which 

1831 # SQLAlchemy doesn't use 

1832 return "" 

1833 

1834 def order_by_clause(self, select, **kw): 

1835 # MSSQL only allows ORDER BY in subqueries if there is a LIMIT 

1836 if self.is_subquery() and not select._limit: 

1837 # avoid processing the order by clause if we won't end up 

1838 # using it, because we don't want all the bind params tacked 

1839 # onto the positional list if that is what the dbapi requires 

1840 return "" 

1841 

1842 order_by = self.process(select._order_by_clause, **kw) 

1843 

1844 if order_by: 

1845 return " ORDER BY " + order_by 

1846 else: 

1847 return "" 

1848 

1849 def update_from_clause( 

1850 self, update_stmt, from_table, extra_froms, from_hints, **kw 

1851 ): 

1852 """Render the UPDATE..FROM clause specific to MSSQL. 

1853 

1854 In MSSQL, if the UPDATE statement involves an alias of the table to 

1855 be updated, then the table itself must be added to the FROM list as 

1856 well. Otherwise, it is optional. Here, we add it regardless. 

1857 

1858 """ 

1859 return "FROM " + ", ".join( 

1860 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

1861 for t in [from_table] + extra_froms 

1862 ) 

1863 

1864 def delete_table_clause(self, delete_stmt, from_table, extra_froms): 

1865 """If we have extra froms make sure we render any alias as hint.""" 

1866 ashint = False 

1867 if extra_froms: 

1868 ashint = True 

1869 return from_table._compiler_dispatch( 

1870 self, asfrom=True, iscrud=True, ashint=ashint 

1871 ) 

1872 

1873 def delete_extra_from_clause( 

1874 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

1875 ): 

1876 """Render the DELETE .. FROM clause specific to MSSQL. 

1877 

1878 Yes, it has the FROM keyword twice. 

1879 

1880 """ 

1881 return "FROM " + ", ".join( 

1882 t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) 

1883 for t in [from_table] + extra_froms 

1884 ) 

1885 

1886 def visit_empty_set_expr(self, type_): 

1887 return "SELECT 1 WHERE 1!=1" 

1888 

1889 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1890 return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

1891 self.process(binary.left), 

1892 self.process(binary.right), 

1893 ) 

1894 

1895 def visit_isnot_distinct_from_binary(self, binary, operator, **kw): 

1896 return "EXISTS (SELECT %s INTERSECT SELECT %s)" % ( 

1897 self.process(binary.left), 

1898 self.process(binary.right), 

1899 ) 

1900 

1901 

1902class MSSQLStrictCompiler(MSSQLCompiler): 

1903 

1904 """A subclass of MSSQLCompiler which disables the usage of bind 

1905 parameters where not allowed natively by MS-SQL. 

1906 

1907 A dialect may use this compiler on a platform where native 

1908 binds are used. 

1909 

1910 """ 

1911 

1912 ansi_bind_rules = True 

1913 

1914 def visit_in_op_binary(self, binary, operator, **kw): 

1915 kw["literal_binds"] = True 

1916 return "%s IN %s" % ( 

1917 self.process(binary.left, **kw), 

1918 self.process(binary.right, **kw), 

1919 ) 

1920 

1921 def visit_notin_op_binary(self, binary, operator, **kw): 

1922 kw["literal_binds"] = True 

1923 return "%s NOT IN %s" % ( 

1924 self.process(binary.left, **kw), 

1925 self.process(binary.right, **kw), 

1926 ) 

1927 

1928 def render_literal_value(self, value, type_): 

1929 """ 

1930 For date and datetime values, convert to a string 

1931 format acceptable to MSSQL. That seems to be the 

1932 so-called ODBC canonical date format which looks 

1933 like this: 

1934 

1935 yyyy-mm-dd hh:mi:ss.mmm(24h) 

1936 

1937 For other data types, call the base class implementation. 

1938 """ 

1939 # datetime and date are both subclasses of datetime.date 

1940 if issubclass(type(value), datetime.date): 

1941 # SQL Server wants single quotes around the date string. 

1942 return "'" + str(value) + "'" 

1943 else: 

1944 return super(MSSQLStrictCompiler, self).render_literal_value( 

1945 value, type_ 

1946 ) 

1947 

1948 

1949class MSDDLCompiler(compiler.DDLCompiler): 

1950 def get_column_specification(self, column, **kwargs): 

1951 colspec = self.preparer.format_column(column) 

1952 

1953 # type is not accepted in a computed column 

1954 if column.computed is not None: 

1955 colspec += " " + self.process(column.computed) 

1956 else: 

1957 colspec += " " + self.dialect.type_compiler.process( 

1958 column.type, type_expression=column 

1959 ) 

1960 

1961 if column.nullable is not None: 

1962 if ( 

1963 not column.nullable 

1964 or column.primary_key 

1965 or isinstance(column.default, sa_schema.Sequence) 

1966 or column.autoincrement is True 

1967 ): 

1968 colspec += " NOT NULL" 

1969 elif column.computed is None: 

1970 # don't specify "NULL" for computed columns 

1971 colspec += " NULL" 

1972 

1973 if column.table is None: 

1974 raise exc.CompileError( 

1975 "mssql requires Table-bound columns " 

1976 "in order to generate DDL" 

1977 ) 

1978 

1979 # install an IDENTITY Sequence if we either a sequence or an implicit 

1980 # IDENTITY column 

1981 if isinstance(column.default, sa_schema.Sequence): 

1982 

1983 if ( 

1984 column.default.start is not None 

1985 or column.default.increment is not None 

1986 or column is not column.table._autoincrement_column 

1987 ): 

1988 util.warn_deprecated( 

1989 "Use of Sequence with SQL Server in order to affect the " 

1990 "parameters of the IDENTITY value is deprecated, as " 

1991 "Sequence " 

1992 "will correspond to an actual SQL Server " 

1993 "CREATE SEQUENCE in " 

1994 "a future release. Please use the mssql_identity_start " 

1995 "and mssql_identity_increment parameters." 

1996 ) 

1997 if column.default.start == 0: 

1998 start = 0 

1999 else: 

2000 start = column.default.start or 1 

2001 

2002 colspec += " IDENTITY(%s,%s)" % ( 

2003 start, 

2004 column.default.increment or 1, 

2005 ) 

2006 elif ( 

2007 column is column.table._autoincrement_column 

2008 or column.autoincrement is True 

2009 ): 

2010 start = column.dialect_options["mssql"]["identity_start"] 

2011 increment = column.dialect_options["mssql"]["identity_increment"] 

2012 colspec += " IDENTITY(%s,%s)" % (start, increment) 

2013 else: 

2014 default = self.get_column_default_string(column) 

2015 if default is not None: 

2016 colspec += " DEFAULT " + default 

2017 

2018 return colspec 

2019 

2020 def visit_create_index(self, create, include_schema=False): 

2021 index = create.element 

2022 self._verify_index_table(index) 

2023 preparer = self.preparer 

2024 text = "CREATE " 

2025 if index.unique: 

2026 text += "UNIQUE " 

2027 

2028 # handle clustering option 

2029 clustered = index.dialect_options["mssql"]["clustered"] 

2030 if clustered is not None: 

2031 if clustered: 

2032 text += "CLUSTERED " 

2033 else: 

2034 text += "NONCLUSTERED " 

2035 

2036 text += "INDEX %s ON %s (%s)" % ( 

2037 self._prepared_index_name(index, include_schema=include_schema), 

2038 preparer.format_table(index.table), 

2039 ", ".join( 

2040 self.sql_compiler.process( 

2041 expr, include_table=False, literal_binds=True 

2042 ) 

2043 for expr in index.expressions 

2044 ), 

2045 ) 

2046 

2047 whereclause = index.dialect_options["mssql"]["where"] 

2048 

2049 if whereclause is not None: 

2050 where_compiled = self.sql_compiler.process( 

2051 whereclause, include_table=False, literal_binds=True 

2052 ) 

2053 text += " WHERE " + where_compiled 

2054 

2055 # handle other included columns 

2056 if index.dialect_options["mssql"]["include"]: 

2057 inclusions = [ 

2058 index.table.c[col] 

2059 if isinstance(col, util.string_types) 

2060 else col 

2061 for col in index.dialect_options["mssql"]["include"] 

2062 ] 

2063 

2064 text += " INCLUDE (%s)" % ", ".join( 

2065 [preparer.quote(c.name) for c in inclusions] 

2066 ) 

2067 

2068 return text 

2069 

2070 def visit_drop_index(self, drop): 

2071 return "\nDROP INDEX %s ON %s" % ( 

2072 self._prepared_index_name(drop.element, include_schema=False), 

2073 self.preparer.format_table(drop.element.table), 

2074 ) 

2075 

2076 def visit_primary_key_constraint(self, constraint): 

2077 if len(constraint) == 0: 

2078 return "" 

2079 text = "" 

2080 if constraint.name is not None: 

2081 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2082 constraint 

2083 ) 

2084 text += "PRIMARY KEY " 

2085 

2086 clustered = constraint.dialect_options["mssql"]["clustered"] 

2087 if clustered is not None: 

2088 if clustered: 

2089 text += "CLUSTERED " 

2090 else: 

2091 text += "NONCLUSTERED " 

2092 

2093 text += "(%s)" % ", ".join( 

2094 self.preparer.quote(c.name) for c in constraint 

2095 ) 

2096 text += self.define_constraint_deferrability(constraint) 

2097 return text 

2098 

2099 def visit_unique_constraint(self, constraint): 

2100 if len(constraint) == 0: 

2101 return "" 

2102 text = "" 

2103 if constraint.name is not None: 

2104 formatted_name = self.preparer.format_constraint(constraint) 

2105 if formatted_name is not None: 

2106 text += "CONSTRAINT %s " % formatted_name 

2107 text += "UNIQUE " 

2108 

2109 clustered = constraint.dialect_options["mssql"]["clustered"] 

2110 if clustered is not None: 

2111 if clustered: 

2112 text += "CLUSTERED " 

2113 else: 

2114 text += "NONCLUSTERED " 

2115 

2116 text += "(%s)" % ", ".join( 

2117 self.preparer.quote(c.name) for c in constraint 

2118 ) 

2119 text += self.define_constraint_deferrability(constraint) 

2120 return text 

2121 

2122 def visit_computed_column(self, generated): 

2123 text = "AS (%s)" % self.sql_compiler.process( 

2124 generated.sqltext, include_table=False, literal_binds=True 

2125 ) 

2126 # explicitly check for True|False since None means server default 

2127 if generated.persisted is True: 

2128 text += " PERSISTED" 

2129 return text 

2130 

2131 

2132class MSIdentifierPreparer(compiler.IdentifierPreparer): 

2133 reserved_words = RESERVED_WORDS 

2134 

2135 def __init__(self, dialect): 

2136 super(MSIdentifierPreparer, self).__init__( 

2137 dialect, 

2138 initial_quote="[", 

2139 final_quote="]", 

2140 quote_case_sensitive_collations=False, 

2141 ) 

2142 

2143 def _escape_identifier(self, value): 

2144 return value 

2145 

2146 def quote_schema(self, schema, force=None): 

2147 """Prepare a quoted table and schema name.""" 

2148 

2149 # need to re-implement the deprecation warning entirely 

2150 if force is not None: 

2151 # not using the util.deprecated_params() decorator in this 

2152 # case because of the additional function call overhead on this 

2153 # very performance-critical spot. 

2154 util.warn_deprecated( 

2155 "The IdentifierPreparer.quote_schema.force parameter is " 

2156 "deprecated and will be removed in a future release. This " 

2157 "flag has no effect on the behavior of the " 

2158 "IdentifierPreparer.quote method; please refer to " 

2159 "quoted_name()." 

2160 ) 

2161 

2162 dbname, owner = _schema_elements(schema) 

2163 if dbname: 

2164 result = "%s.%s" % (self.quote(dbname), self.quote(owner)) 

2165 elif owner: 

2166 result = self.quote(owner) 

2167 else: 

2168 result = "" 

2169 return result 

2170 

2171 

2172def _db_plus_owner_listing(fn): 

2173 def wrap(dialect, connection, schema=None, **kw): 

2174 dbname, owner = _owner_plus_db(dialect, schema) 

2175 return _switch_db( 

2176 dbname, 

2177 connection, 

2178 fn, 

2179 dialect, 

2180 connection, 

2181 dbname, 

2182 owner, 

2183 schema, 

2184 **kw 

2185 ) 

2186 

2187 return update_wrapper(wrap, fn) 

2188 

2189 

2190def _db_plus_owner(fn): 

2191 def wrap(dialect, connection, tablename, schema=None, **kw): 

2192 dbname, owner = _owner_plus_db(dialect, schema) 

2193 return _switch_db( 

2194 dbname, 

2195 connection, 

2196 fn, 

2197 dialect, 

2198 connection, 

2199 tablename, 

2200 dbname, 

2201 owner, 

2202 schema, 

2203 **kw 

2204 ) 

2205 

2206 return update_wrapper(wrap, fn) 

2207 

2208 

2209def _switch_db(dbname, connection, fn, *arg, **kw): 

2210 if dbname: 

2211 current_db = connection.scalar("select db_name()") 

2212 if current_db != dbname: 

2213 connection.execute( 

2214 "use %s" % connection.dialect.identifier_preparer.quote(dbname) 

2215 ) 

2216 try: 

2217 return fn(*arg, **kw) 

2218 finally: 

2219 if dbname and current_db != dbname: 

2220 connection.execute( 

2221 "use %s" 

2222 % connection.dialect.identifier_preparer.quote(current_db) 

2223 ) 

2224 

2225 

2226def _owner_plus_db(dialect, schema): 

2227 if not schema: 

2228 return None, dialect.default_schema_name 

2229 elif "." in schema: 

2230 return _schema_elements(schema) 

2231 else: 

2232 return None, schema 

2233 

2234 

2235_memoized_schema = util.LRUCache() 

2236 

2237 

2238def _schema_elements(schema): 

2239 if isinstance(schema, quoted_name) and schema.quote: 

2240 return None, schema 

2241 

2242 if schema in _memoized_schema: 

2243 return _memoized_schema[schema] 

2244 

2245 # tests for this function are in: 

2246 # test/dialect/mssql/test_reflection.py -> 

2247 # OwnerPlusDBTest.test_owner_database_pairs 

2248 # test/dialect/mssql/test_compiler.py -> test_force_schema_* 

2249 # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_* 

2250 # 

2251 

2252 push = [] 

2253 symbol = "" 

2254 bracket = False 

2255 has_brackets = False 

2256 for token in re.split(r"(\[|\]|\.)", schema): 

2257 if not token: 

2258 continue 

2259 if token == "[": 

2260 bracket = True 

2261 has_brackets = True 

2262 elif token == "]": 

2263 bracket = False 

2264 elif not bracket and token == ".": 

2265 if has_brackets: 

2266 push.append("[%s]" % symbol) 

2267 else: 

2268 push.append(symbol) 

2269 symbol = "" 

2270 has_brackets = False 

2271 else: 

2272 symbol += token 

2273 if symbol: 

2274 push.append(symbol) 

2275 if len(push) > 1: 

2276 dbname, owner = ".".join(push[0:-1]), push[-1] 

2277 

2278 # test for internal brackets 

2279 if re.match(r".*\].*\[.*", dbname[1:-1]): 

2280 dbname = quoted_name(dbname, quote=False) 

2281 else: 

2282 dbname = dbname.lstrip("[").rstrip("]") 

2283 

2284 elif len(push): 

2285 dbname, owner = None, push[0] 

2286 else: 

2287 dbname, owner = None, None 

2288 

2289 _memoized_schema[schema] = dbname, owner 

2290 return dbname, owner 

2291 

2292 

2293class MSDialect(default.DefaultDialect): 

2294 name = "mssql" 

2295 supports_default_values = True 

2296 supports_empty_insert = False 

2297 execution_ctx_cls = MSExecutionContext 

2298 use_scope_identity = True 

2299 max_identifier_length = 128 

2300 schema_name = "dbo" 

2301 

2302 colspecs = { 

2303 sqltypes.DateTime: _MSDateTime, 

2304 sqltypes.Date: _MSDate, 

2305 sqltypes.Time: TIME, 

2306 sqltypes.Unicode: _MSUnicode, 

2307 sqltypes.UnicodeText: _MSUnicodeText, 

2308 } 

2309 

2310 engine_config_types = default.DefaultDialect.engine_config_types.union( 

2311 [("legacy_schema_aliasing", util.asbool)] 

2312 ) 

2313 

2314 ischema_names = ischema_names 

2315 

2316 supports_native_boolean = False 

2317 non_native_boolean_check_constraint = False 

2318 supports_unicode_binds = True 

2319 postfetch_lastrowid = True 

2320 _supports_nvarchar_max = False 

2321 

2322 server_version_info = () 

2323 

2324 statement_compiler = MSSQLCompiler 

2325 ddl_compiler = MSDDLCompiler 

2326 type_compiler = MSTypeCompiler 

2327 preparer = MSIdentifierPreparer 

2328 

2329 construct_arguments = [ 

2330 (sa_schema.PrimaryKeyConstraint, {"clustered": None}), 

2331 (sa_schema.UniqueConstraint, {"clustered": None}), 

2332 (sa_schema.Index, {"clustered": None, "include": None, "where": None}), 

2333 (sa_schema.Column, {"identity_start": 1, "identity_increment": 1}), 

2334 ] 

2335 

2336 def __init__( 

2337 self, 

2338 query_timeout=None, 

2339 use_scope_identity=True, 

2340 schema_name="dbo", 

2341 isolation_level=None, 

2342 deprecate_large_types=None, 

2343 legacy_schema_aliasing=False, 

2344 **opts 

2345 ): 

2346 self.query_timeout = int(query_timeout or 0) 

2347 self.schema_name = schema_name 

2348 

2349 self.use_scope_identity = use_scope_identity 

2350 self.deprecate_large_types = deprecate_large_types 

2351 self.legacy_schema_aliasing = legacy_schema_aliasing 

2352 

2353 super(MSDialect, self).__init__(**opts) 

2354 

2355 self.isolation_level = isolation_level 

2356 

2357 def do_savepoint(self, connection, name): 

2358 # give the DBAPI a push 

2359 connection.execute("IF @@TRANCOUNT = 0 BEGIN TRANSACTION") 

2360 super(MSDialect, self).do_savepoint(connection, name) 

2361 

2362 def do_release_savepoint(self, connection, name): 

2363 # SQL Server does not support RELEASE SAVEPOINT 

2364 pass 

2365 

2366 _isolation_lookup = set( 

2367 [ 

2368 "SERIALIZABLE", 

2369 "READ UNCOMMITTED", 

2370 "READ COMMITTED", 

2371 "REPEATABLE READ", 

2372 "SNAPSHOT", 

2373 ] 

2374 ) 

2375 

2376 def set_isolation_level(self, connection, level): 

2377 level = level.replace("_", " ") 

2378 if level not in self._isolation_lookup: 

2379 raise exc.ArgumentError( 

2380 "Invalid value '%s' for isolation_level. " 

2381 "Valid isolation levels for %s are %s" 

2382 % (level, self.name, ", ".join(self._isolation_lookup)) 

2383 ) 

2384 cursor = connection.cursor() 

2385 cursor.execute("SET TRANSACTION ISOLATION LEVEL %s" % level) 

2386 cursor.close() 

2387 if level == "SNAPSHOT": 

2388 connection.commit() 

2389 

2390 def get_isolation_level(self, connection): 

2391 if self.server_version_info < MS_2005_VERSION: 

2392 raise NotImplementedError( 

2393 "Can't fetch isolation level prior to SQL Server 2005" 

2394 ) 

2395 

2396 last_error = None 

2397 

2398 views = ("sys.dm_exec_sessions", "sys.dm_pdw_nodes_exec_sessions") 

2399 for view in views: 

2400 cursor = connection.cursor() 

2401 try: 

2402 cursor.execute( 

2403 """ 

2404 SELECT CASE transaction_isolation_level 

2405 WHEN 0 THEN NULL 

2406 WHEN 1 THEN 'READ UNCOMMITTED' 

2407 WHEN 2 THEN 'READ COMMITTED' 

2408 WHEN 3 THEN 'REPEATABLE READ' 

2409 WHEN 4 THEN 'SERIALIZABLE' 

2410 WHEN 5 THEN 'SNAPSHOT' END AS TRANSACTION_ISOLATION_LEVEL 

2411 FROM %s 

2412 where session_id = @@SPID 

2413 """ 

2414 % view 

2415 ) 

2416 val = cursor.fetchone()[0] 

2417 except self.dbapi.Error as err: 

2418 # Python3 scoping rules 

2419 last_error = err 

2420 continue 

2421 else: 

2422 return val.upper() 

2423 finally: 

2424 cursor.close() 

2425 else: 

2426 # note that the NotImplementedError is caught by 

2427 # DefaultDialect, so the warning here is all that displays 

2428 util.warn( 

2429 "Could not fetch transaction isolation level, " 

2430 "tried views: %s; final error was: %s" % (views, last_error) 

2431 ) 

2432 raise NotImplementedError( 

2433 "Can't fetch isolation level on this particular " 

2434 "SQL Server version. tried views: %s; final error was: %s" 

2435 % (views, last_error) 

2436 ) 

2437 

2438 def initialize(self, connection): 

2439 super(MSDialect, self).initialize(connection) 

2440 self._setup_version_attributes() 

2441 self._setup_supports_nvarchar_max(connection) 

2442 

2443 def on_connect(self): 

2444 if self.isolation_level is not None: 

2445 

2446 def connect(conn): 

2447 self.set_isolation_level(conn, self.isolation_level) 

2448 

2449 return connect 

2450 else: 

2451 return None 

2452 

2453 def _setup_version_attributes(self): 

2454 if self.server_version_info[0] not in list(range(8, 17)): 

2455 util.warn( 

2456 "Unrecognized server version info '%s'. Some SQL Server " 

2457 "features may not function properly." 

2458 % ".".join(str(x) for x in self.server_version_info) 

2459 ) 

2460 if ( 

2461 self.server_version_info >= MS_2005_VERSION 

2462 and "implicit_returning" not in self.__dict__ 

2463 ): 

2464 self.implicit_returning = True 

2465 if self.server_version_info >= MS_2008_VERSION: 

2466 self.supports_multivalues_insert = True 

2467 if self.deprecate_large_types is None: 

2468 self.deprecate_large_types = ( 

2469 self.server_version_info >= MS_2012_VERSION 

2470 ) 

2471 

2472 def _setup_supports_nvarchar_max(self, connection): 

2473 try: 

2474 connection.scalar( 

2475 sql.text("SELECT CAST('test max support' AS NVARCHAR(max))") 

2476 ) 

2477 except exc.DBAPIError: 

2478 self._supports_nvarchar_max = False 

2479 else: 

2480 self._supports_nvarchar_max = True 

2481 

2482 def _get_default_schema_name(self, connection): 

2483 if self.server_version_info < MS_2005_VERSION: 

2484 return self.schema_name 

2485 else: 

2486 query = sql.text("SELECT schema_name()") 

2487 default_schema_name = connection.scalar(query) 

2488 if default_schema_name is not None: 

2489 # guard against the case where the default_schema_name is being 

2490 # fed back into a table reflection function. 

2491 return quoted_name(default_schema_name, quote=True) 

2492 else: 

2493 return self.schema_name 

2494 

2495 @_db_plus_owner 

2496 def has_table(self, connection, tablename, dbname, owner, schema): 

2497 columns = ischema.columns 

2498 

2499 whereclause = columns.c.table_name == tablename 

2500 

2501 if owner: 

2502 whereclause = sql.and_( 

2503 whereclause, columns.c.table_schema == owner 

2504 ) 

2505 s = sql.select([columns], whereclause) 

2506 c = connection.execute(s) 

2507 return c.first() is not None 

2508 

2509 @reflection.cache 

2510 def get_schema_names(self, connection, **kw): 

2511 s = sql.select( 

2512 [ischema.schemata.c.schema_name], 

2513 order_by=[ischema.schemata.c.schema_name], 

2514 ) 

2515 schema_names = [r[0] for r in connection.execute(s)] 

2516 return schema_names 

2517 

2518 @reflection.cache 

2519 @_db_plus_owner_listing 

2520 def get_table_names(self, connection, dbname, owner, schema, **kw): 

2521 tables = ischema.tables 

2522 s = sql.select( 

2523 [tables.c.table_name], 

2524 sql.and_( 

2525 tables.c.table_schema == owner, 

2526 tables.c.table_type == "BASE TABLE", 

2527 ), 

2528 order_by=[tables.c.table_name], 

2529 ) 

2530 table_names = [r[0] for r in connection.execute(s)] 

2531 return table_names 

2532 

2533 @reflection.cache 

2534 @_db_plus_owner_listing 

2535 def get_view_names(self, connection, dbname, owner, schema, **kw): 

2536 tables = ischema.tables 

2537 s = sql.select( 

2538 [tables.c.table_name], 

2539 sql.and_( 

2540 tables.c.table_schema == owner, tables.c.table_type == "VIEW" 

2541 ), 

2542 order_by=[tables.c.table_name], 

2543 ) 

2544 view_names = [r[0] for r in connection.execute(s)] 

2545 return view_names 

2546 

2547 @reflection.cache 

2548 @_db_plus_owner 

2549 def get_indexes(self, connection, tablename, dbname, owner, schema, **kw): 

2550 # using system catalogs, don't support index reflection 

2551 # below MS 2005 

2552 if self.server_version_info < MS_2005_VERSION: 

2553 return [] 

2554 

2555 rp = connection.execute( 

2556 sql.text( 

2557 "select ind.index_id, ind.is_unique, ind.name " 

2558 "from sys.indexes as ind join sys.tables as tab on " 

2559 "ind.object_id=tab.object_id " 

2560 "join sys.schemas as sch on sch.schema_id=tab.schema_id " 

2561 "where tab.name = :tabname " 

2562 "and sch.name=:schname " 

2563 "and ind.is_primary_key=0 and ind.type != 0" 

2564 ) 

2565 .bindparams( 

2566 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

2567 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

2568 ) 

2569 .columns(name=sqltypes.Unicode()) 

2570 ) 

2571 indexes = {} 

2572 for row in rp: 

2573 indexes[row["index_id"]] = { 

2574 "name": row["name"], 

2575 "unique": row["is_unique"] == 1, 

2576 "column_names": [], 

2577 } 

2578 rp = connection.execute( 

2579 sql.text( 

2580 "select ind_col.index_id, ind_col.object_id, col.name " 

2581 "from sys.columns as col " 

2582 "join sys.tables as tab on tab.object_id=col.object_id " 

2583 "join sys.index_columns as ind_col on " 

2584 "(ind_col.column_id=col.column_id and " 

2585 "ind_col.object_id=tab.object_id) " 

2586 "join sys.schemas as sch on sch.schema_id=tab.schema_id " 

2587 "where tab.name=:tabname " 

2588 "and sch.name=:schname" 

2589 ) 

2590 .bindparams( 

2591 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), 

2592 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

2593 ) 

2594 .columns(name=sqltypes.Unicode()) 

2595 ) 

2596 for row in rp: 

2597 if row["index_id"] in indexes: 

2598 indexes[row["index_id"]]["column_names"].append(row["name"]) 

2599 

2600 return list(indexes.values()) 

2601 

2602 @reflection.cache 

2603 @_db_plus_owner 

2604 def get_view_definition( 

2605 self, connection, viewname, dbname, owner, schema, **kw 

2606 ): 

2607 rp = connection.execute( 

2608 sql.text( 

2609 "select definition from sys.sql_modules as mod, " 

2610 "sys.views as views, " 

2611 "sys.schemas as sch" 

2612 " where " 

2613 "mod.object_id=views.object_id and " 

2614 "views.schema_id=sch.schema_id and " 

2615 "views.name=:viewname and sch.name=:schname" 

2616 ).bindparams( 

2617 sql.bindparam("viewname", viewname, ischema.CoerceUnicode()), 

2618 sql.bindparam("schname", owner, ischema.CoerceUnicode()), 

2619 ) 

2620 ) 

2621 

2622 if rp: 

2623 view_def = rp.scalar() 

2624 return view_def 

2625 

2626 @reflection.cache 

2627 @_db_plus_owner 

2628 def get_columns(self, connection, tablename, dbname, owner, schema, **kw): 

2629 # Get base columns 

2630 columns = ischema.columns 

2631 computed_cols = ischema.computed_columns 

2632 if owner: 

2633 whereclause = sql.and_( 

2634 columns.c.table_name == tablename, 

2635 columns.c.table_schema == owner, 

2636 ) 

2637 table_fullname = "%s.%s" % (owner, tablename) 

2638 full_name = columns.c.table_schema + "." + columns.c.table_name 

2639 join_on = computed_cols.c.object_id == func.object_id(full_name) 

2640 else: 

2641 whereclause = columns.c.table_name == tablename 

2642 table_fullname = tablename 

2643 join_on = computed_cols.c.object_id == func.object_id( 

2644 columns.c.table_name 

2645 ) 

2646 

2647 join_on = sql.and_( 

2648 join_on, columns.c.column_name == computed_cols.c.name 

2649 ) 

2650 join = columns.join(computed_cols, onclause=join_on, isouter=True) 

2651 

2652 if self._supports_nvarchar_max: 

2653 computed_definition = computed_cols.c.definition 

2654 else: 

2655 # tds_version 4.2 does not support NVARCHAR(MAX) 

2656 computed_definition = sql.cast( 

2657 computed_cols.c.definition, NVARCHAR(4000) 

2658 ) 

2659 

2660 s = sql.select( 

2661 [columns, computed_definition, computed_cols.c.is_persisted], 

2662 whereclause, 

2663 from_obj=join, 

2664 order_by=[columns.c.ordinal_position], 

2665 ) 

2666 

2667 c = connection.execute(s) 

2668 cols = [] 

2669 

2670 while True: 

2671 row = c.fetchone() 

2672 if row is None: 

2673 break 

2674 name = row[columns.c.column_name] 

2675 type_ = row[columns.c.data_type] 

2676 nullable = row[columns.c.is_nullable] == "YES" 

2677 charlen = row[columns.c.character_maximum_length] 

2678 numericprec = row[columns.c.numeric_precision] 

2679 numericscale = row[columns.c.numeric_scale] 

2680 default = row[columns.c.column_default] 

2681 collation = row[columns.c.collation_name] 

2682 definition = row[computed_definition] 

2683 is_persisted = row[computed_cols.c.is_persisted] 

2684 

2685 coltype = self.ischema_names.get(type_, None) 

2686 

2687 kwargs = {} 

2688 if coltype in ( 

2689 MSString, 

2690 MSChar, 

2691 MSNVarchar, 

2692 MSNChar, 

2693 MSText, 

2694 MSNText, 

2695 MSBinary, 

2696 MSVarBinary, 

2697 sqltypes.LargeBinary, 

2698 ): 

2699 if charlen == -1: 

2700 charlen = None 

2701 kwargs["length"] = charlen 

2702 if collation: 

2703 kwargs["collation"] = collation 

2704 

2705 if coltype is None: 

2706 util.warn( 

2707 "Did not recognize type '%s' of column '%s'" 

2708 % (type_, name) 

2709 ) 

2710 coltype = sqltypes.NULLTYPE 

2711 else: 

2712 if issubclass(coltype, sqltypes.Numeric): 

2713 kwargs["precision"] = numericprec 

2714 

2715 if not issubclass(coltype, sqltypes.Float): 

2716 kwargs["scale"] = numericscale 

2717 

2718 coltype = coltype(**kwargs) 

2719 cdict = { 

2720 "name": name, 

2721 "type": coltype, 

2722 "nullable": nullable, 

2723 "default": default, 

2724 "autoincrement": False, 

2725 } 

2726 

2727 if definition is not None and is_persisted is not None: 

2728 cdict["computed"] = { 

2729 "sqltext": definition, 

2730 "persisted": is_persisted, 

2731 } 

2732 

2733 cols.append(cdict) 

2734 # autoincrement and identity 

2735 colmap = {} 

2736 for col in cols: 

2737 colmap[col["name"]] = col 

2738 # We also run an sp_columns to check for identity columns: 

2739 cursor = connection.execute( 

2740 "sp_columns @table_name = '%s', " 

2741 "@table_owner = '%s'" % (tablename, owner) 

2742 ) 

2743 ic = None 

2744 while True: 

2745 row = cursor.fetchone() 

2746 if row is None: 

2747 break 

2748 (col_name, type_name) = row[3], row[5] 

2749 if type_name.endswith("identity") and col_name in colmap: 

2750 ic = col_name 

2751 colmap[col_name]["autoincrement"] = True 

2752 colmap[col_name]["dialect_options"] = { 

2753 "mssql_identity_start": 1, 

2754 "mssql_identity_increment": 1, 

2755 } 

2756 break 

2757 cursor.close() 

2758 

2759 if ic is not None and self.server_version_info >= MS_2005_VERSION: 

2760 table_fullname = "%s.%s" % (owner, tablename) 

2761 cursor = connection.execute( 

2762 "select ident_seed('%s'), ident_incr('%s')" 

2763 % (table_fullname, table_fullname) 

2764 ) 

2765 

2766 row = cursor.first() 

2767 if row is not None and row[0] is not None: 

2768 colmap[ic]["dialect_options"].update( 

2769 { 

2770 "mssql_identity_start": int(row[0]), 

2771 "mssql_identity_increment": int(row[1]), 

2772 } 

2773 ) 

2774 return cols 

2775 

2776 @reflection.cache 

2777 @_db_plus_owner 

2778 def get_pk_constraint( 

2779 self, connection, tablename, dbname, owner, schema, **kw 

2780 ): 

2781 pkeys = [] 

2782 TC = ischema.constraints 

2783 C = ischema.key_constraints.alias("C") 

2784 

2785 # Primary key constraints 

2786 s = sql.select( 

2787 [C.c.column_name, TC.c.constraint_type, C.c.constraint_name], 

2788 sql.and_( 

2789 TC.c.constraint_name == C.c.constraint_name, 

2790 TC.c.table_schema == C.c.table_schema, 

2791 C.c.table_name == tablename, 

2792 C.c.table_schema == owner, 

2793 ), 

2794 ) 

2795 c = connection.execute(s) 

2796 constraint_name = None 

2797 for row in c: 

2798 if "PRIMARY" in row[TC.c.constraint_type.name]: 

2799 pkeys.append(row[0]) 

2800 if constraint_name is None: 

2801 constraint_name = row[C.c.constraint_name.name] 

2802 return {"constrained_columns": pkeys, "name": constraint_name} 

2803 

2804 @reflection.cache 

2805 @_db_plus_owner 

2806 def get_foreign_keys( 

2807 self, connection, tablename, dbname, owner, schema, **kw 

2808 ): 

2809 RR = ischema.ref_constraints 

2810 C = ischema.key_constraints.alias("C") 

2811 R = ischema.key_constraints.alias("R") 

2812 

2813 # Foreign key constraints 

2814 s = sql.select( 

2815 [ 

2816 C.c.column_name, 

2817 R.c.table_schema, 

2818 R.c.table_name, 

2819 R.c.column_name, 

2820 RR.c.constraint_name, 

2821 RR.c.match_option, 

2822 RR.c.update_rule, 

2823 RR.c.delete_rule, 

2824 ], 

2825 sql.and_( 

2826 C.c.table_name == tablename, 

2827 C.c.table_schema == owner, 

2828 RR.c.constraint_schema == C.c.table_schema, 

2829 C.c.constraint_name == RR.c.constraint_name, 

2830 R.c.constraint_name == RR.c.unique_constraint_name, 

2831 R.c.constraint_schema == RR.c.unique_constraint_schema, 

2832 C.c.ordinal_position == R.c.ordinal_position, 

2833 ), 

2834 order_by=[RR.c.constraint_name, R.c.ordinal_position], 

2835 ) 

2836 

2837 # group rows by constraint ID, to handle multi-column FKs 

2838 fkeys = [] 

2839 

2840 def fkey_rec(): 

2841 return { 

2842 "name": None, 

2843 "constrained_columns": [], 

2844 "referred_schema": None, 

2845 "referred_table": None, 

2846 "referred_columns": [], 

2847 } 

2848 

2849 fkeys = util.defaultdict(fkey_rec) 

2850 

2851 for r in connection.execute(s).fetchall(): 

2852 scol, rschema, rtbl, rcol, rfknm, fkmatch, fkuprule, fkdelrule = r 

2853 

2854 rec = fkeys[rfknm] 

2855 rec["name"] = rfknm 

2856 if not rec["referred_table"]: 

2857 rec["referred_table"] = rtbl 

2858 if schema is not None or owner != rschema: 

2859 if dbname: 

2860 rschema = dbname + "." + rschema 

2861 rec["referred_schema"] = rschema 

2862 

2863 local_cols, remote_cols = ( 

2864 rec["constrained_columns"], 

2865 rec["referred_columns"], 

2866 ) 

2867 

2868 local_cols.append(scol) 

2869 remote_cols.append(rcol) 

2870 

2871 return list(fkeys.values())