Coverage for cc_modules/cc_sqla_coltypes.py: 66%

491 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:51 +0100

1""" 

2camcops_server/cc_modules/cc_sqla_coltypes.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**SQLAlchemy column types used by CamCOPS.** 

27 

28Note these built-in SQLAlchemy types 

29(https://docs.sqlalchemy.org/en/latest/core/type_basics.html#generic-types): 

30 

31 =============== =========================================================== 

32 SQLAlchemy type Comment 

33 =============== =========================================================== 

34 BigInteger MySQL: -9,223,372,036,854,775,808 to 

35 9,223,372,036,854,775,807 (64-bit) 

36 (compare NHS number: up to 9,999,999,999) 

37 Boolean 

38 Date 

39 DateTime 

40 Enum 

41 Float 

42 Integer MySQL: -2,147,483,648 to 2,147,483,647 (32-bit) 

43 Interval For ``datetime.timedelta`` 

44 LargeBinary Under MySQL, maps to ``BLOB`` 

45 MatchType For the return type of the ``MATCH`` operator 

46 Numeric For fixed-precision numbers like ``NUMERIC`` or ``DECIMAL`` 

47 PickleType 

48 SchemaType 

49 SmallInteger 

50 String ``VARCHAR`` 

51 Text Variably sized string type. 

52 (Under MySQL, renders as ``TEXT``.) 

53 Time 

54 Unicode Implies that the underlying column explicitly supports 

55 Unicode 

56 UnicodeText Variably sized version of Unicode 

57 (Under MySQL, renders as ``TEXT`` too.) 

58 =============== =========================================================== 

59 

60Not supported across all platforms: 

61 

62 =============== =========================================================== 

63 SQL type Comment 

64 =============== =========================================================== 

65 BIGINT UNSIGNED MySQL: 0 to 18,446,744,073,709,551,615 (64-bit). 

66 Use ``sqlalchemy.dialects.mysql.BIGINT(unsigned=True)``. 

67 INT UNSIGNED MySQL: 0 to 4,294,967,295 (32-bit). 

68 Use ``sqlalchemy.dialects.mysql.INTEGER(unsigned=True)``. 

69 =============== =========================================================== 

70 

71Other MySQL sizes: 

72 

73 =============== =========================================================== 

74 MySQL type Comment 

75 =============== =========================================================== 

76 TINYBLOB 2^8 bytes = 256 bytes 

77 BLOB 2^16 bytes = 64 KiB 

78 MEDIUMBLOB 2^24 bytes = 16 MiB 

79 LONGBLOB 2^32 bytes = 4 GiB 

80 TINYTEXT 255 (2^8 - 1) bytes 

81 TEXT 65,535 bytes (2^16 - 1) = 64 KiB 

82 MEDIUMTEXT 16,777,215 (2^24 - 1) bytes = 16 MiB 

83 LONGTEXT 4,294,967,295 (2^32 - 1) bytes = 4 GiB 

84 =============== =========================================================== 

85 

86See https://stackoverflow.com/questions/13932750/tinytext-text-mediumtext-and-longtext-maximum-storage-sizes. 

87 

88Also notes: 

89 

90- Columns may need their character set specified explicitly under MySQL: 

91 https://stackoverflow.com/questions/2108824/mysql-incorrect-string-value-error-when-save-unicode-string-in-django 

92 

93""" # noqa 

94 

95# ============================================================================= 

96# Imports 

97# ============================================================================= 

98 

99import json 

100import logging 

101from typing import ( 

102 Any, 

103 Generator, 

104 List, 

105 NoReturn, 

106 Optional, 

107 Sequence, 

108 Tuple, 

109 Type, 

110 TYPE_CHECKING, 

111 Union, 

112) 

113import uuid 

114 

115from cardinal_pythonlib.datetimefunc import ( 

116 coerce_to_pendulum, 

117 convert_datetime_to_utc, 

118 duration_from_iso, 

119 duration_to_iso, 

120 PotentialDatetimeType, 

121) 

122from cardinal_pythonlib.lists import chunks 

123from cardinal_pythonlib.logs import BraceStyleAdapter 

124from cardinal_pythonlib.reprfunc import auto_repr 

125from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName 

126from cardinal_pythonlib.sqlalchemy.orm_inspect import ( 

127 gen_columns, 

128 gen_relationships, 

129) 

130from cardinal_pythonlib.sqlalchemy.sqlfunc import ( 

131 fail_unknown_dialect, 

132 fetch_processed_single_clause, 

133) 

134from isodate.isoerror import ISO8601Error 

135from pendulum import DateTime as Pendulum, Duration 

136from pendulum.parsing.exceptions import ParserError 

137import phonenumbers 

138from semantic_version import Version 

139from sqlalchemy.dialects import mysql 

140from sqlalchemy.engine.interfaces import Dialect 

141from sqlalchemy.ext.compiler import compiles 

142from sqlalchemy.orm import mapped_column, MappedColumn 

143from sqlalchemy.orm.relationships import RelationshipProperty 

144from sqlalchemy.sql.elements import conv 

145from sqlalchemy.sql.expression import text 

146from sqlalchemy.sql.functions import FunctionElement 

147from sqlalchemy.sql.schema import Column 

148from sqlalchemy.sql.sqltypes import ( 

149 Boolean, 

150 CHAR, 

151 DateTime, 

152 LargeBinary, 

153 String, 

154 Text, 

155 Unicode, 

156 UnicodeText, 

157) 

158from sqlalchemy.sql.type_api import TypeDecorator 

159 

160from camcops_server.cc_modules.cc_constants import PV, StringLengths 

161from camcops_server.cc_modules.cc_simpleobjects import IdNumReference 

162from camcops_server.cc_modules.cc_sqlalchemy import ( 

163 LONG_COLUMN_NAME_WARNING_LIMIT, 

164) 

165from camcops_server.cc_modules.cc_version import make_version 

166 

167if TYPE_CHECKING: 

168 from sqlalchemy.sql.elements import ClauseElement 

169 from sqlalchemy.sql.compiler import SQLCompiler 

170 from sqlalchemy.sql.type_api import _CT, ColumnElement, OperatorType 

171 from camcops_server.cc_modules.cc_db import ( 

172 GenericTabletRecordMixin, 

173 ) 

174 

175log = BraceStyleAdapter(logging.getLogger(__name__)) 

176 

177 

178# ============================================================================= 

179# Debugging options 

180# ============================================================================= 

181 

182DEBUG_DATETIME_AS_ISO_TEXT = False 

183DEBUG_DURATION_AS_ISO_TEXT = False 

184DEBUG_IDNUMDEF_LIST = False 

185DEBUG_INT_LIST_COLTYPE = False 

186DEBUG_SEMANTIC_VERSION = False 

187DEBUG_STRING_LIST_COLTYPE = False 

188 

189if any( 

190 [ 

191 DEBUG_DATETIME_AS_ISO_TEXT, 

192 DEBUG_DURATION_AS_ISO_TEXT, 

193 DEBUG_SEMANTIC_VERSION, 

194 DEBUG_IDNUMDEF_LIST, 

195 DEBUG_INT_LIST_COLTYPE, 

196 DEBUG_STRING_LIST_COLTYPE, 

197 ] 

198): 

199 log.warning("Debugging options enabled!") 

200 

201 

202# ============================================================================= 

203# Constants 

204# ============================================================================= 

205 

206 

207class RelationshipInfo(object): 

208 """ 

209 Used as keys the ``info`` (user-defined) dictionary parameter to SQLAlchemy 

210 ``relationship`` calls; see 

211 https://docs.sqlalchemy.org/en/latest/orm/relationship_api.html#sqlalchemy.orm.relationship. 

212 """ 

213 

214 IS_ANCILLARY = "is_ancillary" 

215 IS_BLOB = "is_blob" 

216 

217 

218# ============================================================================= 

219# Simple derivative column types 

220# ============================================================================= 

221# If you insert something too long into a VARCHAR, it just gets truncated. 

222 

223AuditSourceColType = String(length=StringLengths.AUDIT_SOURCE_MAX_LEN) 

224 

225# BigIntUnsigned = Integer().with_variant(mysql.BIGINT(unsigned=True), 'mysql') 

226# ... partly because Alembic breaks on variants (Aug 2017), and partly because 

227# it's nonstandard and unnecessary, changed all BigIntUnsigned to 

228# BigInteger (2017-08-25). 

229 

230Base32ColType = String(length=StringLengths.BASE32_MAX_LEN) 

231 

232CharColType = String(length=1) 

233CharsetColType = String(length=StringLengths.CHARSET_MAX_LEN) 

234CurrencyColType = Unicode(length=StringLengths.CURRENCY_MAX_LEN) 

235 

236DatabaseTitleColType = Unicode(length=StringLengths.DATABASE_TITLE_MAX_LEN) 

237DeviceNameColType = String(length=StringLengths.DEVICE_NAME_MAX_LEN) 

238DiagnosticCodeColType = String(length=StringLengths.DIAGNOSTIC_CODE_MAX_LEN) 

239 

240EmailAddressColType = Unicode(length=StringLengths.EMAIL_ADDRESS_MAX_LEN) 

241EraColType = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN) 

242ExportRecipientNameColType = String( 

243 length=StringLengths.EXPORT_RECIPIENT_NAME_MAX_LEN 

244) 

245ExportTransmissionMethodColType = String( 

246 length=StringLengths.SENDING_FORMAT_MAX_LEN 

247) 

248 

249FilterTextColType = Unicode(length=StringLengths.FILTER_TEXT_MAX_LEN) 

250FileSpecColType = Unicode(length=StringLengths.FILESPEC_MAX_LEN) 

251FullNameColType = Unicode(length=StringLengths.FULLNAME_MAX_LEN) 

252 

253GroupDescriptionColType = Unicode( 

254 length=StringLengths.GROUP_DESCRIPTION_MAX_LEN 

255) 

256GroupNameColType = Unicode(length=StringLengths.GROUP_NAME_MAX_LEN) 

257 

258HashedPasswordColType = String(length=StringLengths.HASHED_PW_MAX_LEN) 

259# ... You might think that we must ensure case-SENSITIVE comparison on this 

260# field. That would require the option collation='utf8mb4_bin' to String(), 

261# for MySQL. However, that is MySQL-specific, and SQLAlchemy currently (Oct 

262# 2017) doesn't support database-specific *per-column* collations. SQLite 

263# accepts COLLATE commands but chokes on 'utf8mb4_bin'. Now, the hashed 

264# password from bcrypt() is case-sensitive. HOWEVER, the important thing is 

265# that we always retrieve the string from the database and do a case-sensitive 

266# comparison in Python (see calls to is_password_valid()). So the database 

267# collation doesn't matter. So we don't set it. 

268# See further notes in cc_sqlalchemy.py 

269HL7AssigningAuthorityType = String(length=StringLengths.HL7_AA_MAX_LEN) 

270HL7IdTypeType = String(length=StringLengths.HL7_ID_TYPE_MAX_LEN) 

271HostnameColType = String(length=StringLengths.HOSTNAME_MAX_LEN) 

272 

273IdDescriptorColType = Unicode(length=StringLengths.ID_DESCRIPTOR_MAX_LEN) 

274IdPolicyColType = String(length=StringLengths.ID_POLICY_MAX_LEN) 

275# IntUnsigned = Integer().with_variant(mysql.INTEGER(unsigned=True), 'mysql') 

276IPAddressColType = String(length=StringLengths.IP_ADDRESS_MAX_LEN) 

277# This is a plain string. 

278# See also e.g. http://sqlalchemy-utils.readthedocs.io/en/latest/_modules/sqlalchemy_utils/types/ip_address.html # noqa 

279 

280LanguageCodeColType = String(length=StringLengths.LANGUAGE_CODE_MAX_LEN) 

281 

282# Large BLOB: 

283# https://stackoverflow.com/questions/43791725/sqlalchemy-how-to-make-a-longblob-column-in-mysql # noqa 

284# One of these: 

285# noinspection PyTypeChecker 

286LongBlob = LargeBinary().with_variant(mysql.LONGBLOB, "mysql") 

287# LongBlob = LargeBinary(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa 

288 

289# noinspection PyTypeChecker 

290LongText = UnicodeText().with_variant(mysql.LONGTEXT, "mysql") 

291# LongText = UnicodeText(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa 

292 

293MfaMethodColType = String(length=StringLengths.MFA_METHOD_MAX_LEN) 

294MimeTypeColType = String(length=StringLengths.MIMETYPE_MAX_LEN) 

295 

296PatientNameColType = Unicode(length=StringLengths.PATIENT_NAME_MAX_LEN) 

297 

298Rfc2822DateColType = String(length=StringLengths.RFC_2822_DATE_MAX_LEN) 

299 

300SessionTokenColType = String(length=StringLengths.SESSION_TOKEN_MAX_LEN) 

301SexColType = String(length=1) 

302SummaryCategoryColType = String( 

303 length=StringLengths.TASK_SUMMARY_TEXT_FIELD_DEFAULT_MAX_LEN 

304) 

305# ... pretty generic 

306 

307TableNameColType = String(length=StringLengths.TABLENAME_MAX_LEN) 

308 

309UrlColType = String(length=StringLengths.URL_MAX_LEN) 

310UserNameCamcopsColType = String(length=StringLengths.USERNAME_CAMCOPS_MAX_LEN) 

311UserNameExternalColType = String( 

312 length=StringLengths.USERNAME_EXTERNAL_MAX_LEN 

313) 

314 

315 

316# ============================================================================= 

317# Helper operations for PendulumDateTimeAsIsoTextColType 

318# ============================================================================= 

319# Database string format is e.g. 

320# 2013-07-24T20:04:07.123456+01:00 

321# 2013-07-24T20:04:07.123+01:00 

322# 0 1 2 3 } position in string; 1-based 

323# 12345678901234567890123456789012 } 

324# 

325# So: rightmost 6 characters are time zone; rest is date/time. 

326# leftmost 23 characters are time up to millisecond precision. 

327# overall length is typically 29 (milliseconds) or 32 (microseconds) 

328 

329_TZ_LEN = 6 # length of the timezone part of the ISO8601 string 

330_UTC_TZ_LITERAL = "'+00:00'" 

331_SQLITE_DATETIME_FMT_FOR_PYTHON = "'%Y-%m-%d %H:%M:%f'" 

332 

333_MYSQL_DATETIME_LEN = 19 

334_SQLSERVER_DATETIME_LEN = 19 

335_SQLSERVER_DATETIME2_LEN = 27 

336 

337 

338# ----------------------------------------------------------------------------- 

339# isotzdatetime_to_utcdatetime 

340# ----------------------------------------------------------------------------- 

341 

342 

343# noinspection PyPep8Naming 

344class isotzdatetime_to_utcdatetime(FunctionElement): 

345 """ 

346 Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`. 

347 

348 Creates an SQL expression wrapping a field containing our ISO-8601 text, 

349 making a ``DATETIME`` out of it, in the UTC timezone. 

350 

351 Implemented for different SQL dialects. 

352 """ 

353 

354 type = DateTime() 

355 name = "isotzdatetime_to_utcdatetime" 

356 inherit_cache = False 

357 

358 

359# noinspection PyUnusedLocal 

360@compiles(isotzdatetime_to_utcdatetime) 

361def isotzdatetime_to_utcdatetime_default( 

362 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any 

363) -> None: 

364 """ 

365 Default implementation for :class:`isotzdatetime_to_utcdatetime`: fail. 

366 """ 

367 fail_unknown_dialect(compiler, "perform isotzdatetime_to_utcdatetime") 

368 

369 

370# noinspection PyUnusedLocal 

371@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.MYSQL) 

372def isotzdatetime_to_utcdatetime_mysql( 

373 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any 

374) -> str: 

375 """ 

376 Implementation of :class:`isotzdatetime_to_utcdatetime` for MySQL. 

377 

378 For format, see 

379 https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_date-format 

380 

381 Note the use of "%i" for minutes. 

382 

383 Things after ``func.`` get passed to the database engine as literal SQL 

384 functions; https://docs.sqlalchemy.org/en/latest/core/tutorial.html 

385 """ 

386 x = fetch_processed_single_clause(element, compiler) 

387 

388 # Let's do this in a clear way: 

389 date_time_part = f"LEFT({x}, LENGTH({x}) - {_TZ_LEN})" 

390 # ... drop the rightmost 6 chars (the timezone component) 

391 fmt = compiler.process(text("'%Y-%m-%dT%H:%i:%S.%f'")) 

392 # ... the text() part deals with the necessary escaping of % for the DBAPI 

393 the_date_time = f"STR_TO_DATE({date_time_part}, {fmt})" 

394 # ... STR_TO_DATE() returns a DATETIME if the string contains both date and 

395 # time components. 

396 old_timezone = f"RIGHT({x}, {_TZ_LEN})" 

397 result_utc = ( 

398 f"CONVERT_TZ({the_date_time}, {old_timezone}, {_UTC_TZ_LITERAL})" 

399 ) 

400 

401 # log.debug(result_utc) 

402 return result_utc 

403 

404 

405# noinspection PyUnusedLocal 

406@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLITE) 

407def isotzdatetime_to_utcdatetime_sqlite( 

408 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any 

409) -> str: 

410 """ 

411 Implementation of :class:`isotzdatetime_to_utcdatetime` for SQLite. 

412 

413 - https://sqlite.org/lang_corefunc.html#substr 

414 - https://sqlite.org/lang_datefunc.html 

415 - https://www.sqlite.org/lang_expr.html 

416 

417 Get an SQL expression for the timezone adjustment in hours. 

418 Note that if a time is 12:00+01:00, that means e.g. midday BST, which 

419 is 11:00+00:00 or 11:00 UTC. So you SUBTRACT the displayed timezone from 

420 the time, which I've always thought is a bit odd. 

421 

422 Ha! Was busy implementing this, but SQLite is magic; if there's a 

423 timezone at the end, ``STRFTIME()`` will convert it to UTC automatically! 

424 Moreover, the format is the OUTPUT format that a Python datetime will 

425 recognize, so no 'T'. 

426 

427 The output format is like this: ``2018-06-01 00:00:00.000``. Note that 

428 SQLite provides millisecond precision only (in general and via the ``%f`` 

429 argument to ``STRFTIME``). 

430 

431 See also SQLAlchemy's DATETIME support for SQLite: 

432 

433 - https://docs.sqlalchemy.org/en/13/dialects/sqlite.html?highlight=sqlite#sqlalchemy.dialects.sqlite.DATETIME 

434 

435 ... but that doesn't support timezones, so that doesn't help us. 

436 

437 One further problem -- see 

438 :class:`camcops_server.tasks.core10.Core10ReportDateRangeTests` -- is that 

439 comparisons are done by SQLite as text, so e.g. 

440 

441 .. code-block:: sql 

442 

443 SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000000'; -- 0, false 

444 SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000'; -- 1, true 

445 

446 and therefore we need to ensure either that the SQLite side gets translated 

447 to 6dp, or the bind param gets translated to 3dp. I don't think we can 

448 always have control over the bind parameter. So we append '000' to the 

449 SQLite side. 

450 

451 """ # noqa 

452 x = fetch_processed_single_clause(element, compiler) 

453 fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON)) 

454 result = f"(STRFTIME({fmt}, {x}) || '000')" 

455 # log.debug(result) 

456 return result 

457 

458 

459# noinspection PyUnusedLocal 

460@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLSERVER) 

461def isotzdatetime_to_utcdatetime_sqlserver( 

462 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any 

463) -> str: 

464 """ 

465 Implementation of :class:`isotzdatetime_to_utcdatetime` for SQL Server. 

466 

467 **Converting strings to DATETIME values** 

468 

469 - ``CAST()``: Part of ANSI SQL. 

470 - ``CONVERT()``: Not part of ANSI SQL; has some extra formatting options. 

471 

472 Both methods work: 

473 

474 .. code-block:: sql 

475 

476 SELECT CAST('2001-01-31T21:30:49.123' AS DATETIME) AS via_cast, 

477 CONVERT(DATETIME, '2001-01-31T21:30:49.123') AS via_convert; 

478 

479 ... fine on SQL Server 2005, with milliseconds in both cases. 

480 However, going beyond milliseconds doesn't fail gracefully, it causes an 

481 error (e.g. "...21:30.49.123456") both for CAST and CONVERT. 

482 

483 The ``DATETIME2`` format accepts greater precision, but requires SQL Server 

484 2008 or higher. Then this works: 

485 

486 .. code-block:: sql 

487 

488 SELECT CAST('2001-01-31T21:30:49.123456' AS DATETIME2) AS via_cast, 

489 CONVERT(DATETIME2, '2001-01-31T21:30:49.123456') AS via_convert; 

490 

491 So as not to be too optimistic: ``CAST(x AS DATETIME2)`` ignores (silently) 

492 any timezone information in the string. So does ``CONVERT(DATETIME2, x, {0 

493 or 1})``. 

494 

495 **Converting between time zones** 

496 

497 NO TIME ZONE SUPPORT in SQL Server 2005. 

498 e.g. https://stackoverflow.com/questions/3200827/how-to-convert-timezones-in-sql-server-2005. 

499 

500 .. code-block:: none 

501 

502 TODATETIMEOFFSET(expression, time_zone): 

503 expression: something that evaluates to a DATETIME2 value 

504 time_zone: integer minutes, or string hours/minutes e.g. "+13.00" 

505 -> produces a DATETIMEOFFSET value 

506 

507 Available from SQL Server 2008 

508 (https://docs.microsoft.com/en-us/sql/t-sql/functions/todatetimeoffset-transact-sql). 

509 

510 .. code-block:: none 

511 

512 SWITCHOFFSET 

513 -> converts one DATETIMEOFFSET value to another, preserving its UTC 

514 time, but changing the displayed (local) time zone. 

515 

516 ... however, is that unnecessary? We want a plain ``DATETIME2`` in UTC, and 

517 .conversion to UTC is automatically achieved by ``CONVERT(DATETIME2, 

518 .some_datetimeoffset, 1)`` 

519 

520 ... https://stackoverflow.com/questions/4953903/how-can-i-convert-a-sql-server-2008-datetimeoffset-to-a-datetime 

521 

522 ... but not by ``CAST(some_datetimeoffset AS DATETIME2)``, and not by 

523 ``CONVERT(DATETIME2, some_datetimeoffset, 0)`` 

524 

525 ... and styles 0 and 1 are the only ones permissible from SQL Server 2012 

526 and up (empirically, and documented for the reverse direction at 

527 https://docs.microsoft.com/en-us/sql/t-sql/functions/cast-and-convert-transact-sql?view=sql-server-2017) 

528 

529 ... this is not properly documented re UTC conversion, as far as I can 

530 see. Let's use ``SWITCHOFFSET -> CAST`` to be explicit and clear. 

531 

532 ``AT TIME ZONE``: From SQL Server 2016 only. 

533 https://docs.microsoft.com/en-us/sql/t-sql/queries/at-time-zone-transact-sql?view=sql-server-2017 

534 

535 **Therefore** 

536 

537 - We need to require SQL Server 2008 or higher. 

538 - Therefore we can use the ``DATETIME2`` type. 

539 - Note that ``LEN()``, not ``LENGTH()``, is ANSI SQL; SQL Server only 

540 supports ``LEN``. 

541 

542 **Example (tested on SQL Server 2014)** 

543 

544 .. code-block:: sql 

545 

546 DECLARE @source AS VARCHAR(100) = '2001-01-31T21:30:49.123456+07:00'; 

547 

548 SELECT CAST( 

549 SWITCHOFFSET( 

550 TODATETIMEOFFSET( 

551 CAST(LEFT(@source, LEN(@source) - 6) AS DATETIME2), 

552 RIGHT(@source, 6) 

553 ), 

554 '+00:00' 

555 ) 

556 AS DATETIME2 

557 ) -- 2001-01-31 14:30:49.1234560 

558 

559 """ # noqa 

560 x = fetch_processed_single_clause(element, compiler) 

561 

562 date_time_part = f"LEFT({x}, LEN({x}) - {_TZ_LEN})" # a VARCHAR 

563 old_timezone = f"RIGHT({x}, {_TZ_LEN})" # a VARCHAR 

564 date_time_no_tz = f"CAST({date_time_part} AS DATETIME2)" # a DATETIME2 

565 date_time_offset_with_old_tz = ( 

566 f"TODATETIMEOFFSET({date_time_no_tz}, {old_timezone})" 

567 # a DATETIMEOFFSET 

568 ) 

569 date_time_offset_with_utc_tz = ( 

570 f"SWITCHOFFSET({date_time_offset_with_old_tz}, {_UTC_TZ_LITERAL})" 

571 # a DATETIMEOFFSET in UTC 

572 ) 

573 result_utc = f"CAST({date_time_offset_with_utc_tz} AS DATETIME2)" 

574 

575 # log.debug(result_utc) 

576 return result_utc 

577 

578 

579# ----------------------------------------------------------------------------- 

580# unknown_field_to_utcdatetime 

581# ----------------------------------------------------------------------------- 

582 

583 

584# noinspection PyPep8Naming 

585class unknown_field_to_utcdatetime(FunctionElement): 

586 """ 

587 Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`. 

588 

589 Creates an SQL expression wrapping a field containing something unknown, 

590 which might be a ``DATETIME`` or an ISO-formatted field, and 

591 making a ``DATETIME`` out of it, in the UTC timezone. 

592 

593 Implemented for different SQL dialects. 

594 """ 

595 

596 type = DateTime() 

597 name = "unknown_field_to_utcdatetime" 

598 inherit_cache = False 

599 

600 

601# noinspection PyUnusedLocal 

602@compiles(unknown_field_to_utcdatetime) 

603def unknown_field_to_utcdatetime_default( 

604 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any 

605) -> None: 

606 """ 

607 Default implementation for :class:`unknown_field_to_utcdatetime`: fail. 

608 """ 

609 fail_unknown_dialect(compiler, "perform unknown_field_to_utcdatetime") 

610 

611 

612# noinspection PyUnusedLocal 

613@compiles(unknown_field_to_utcdatetime, SqlaDialectName.MYSQL) 

614def unknown_field_to_utcdatetime_mysql( 

615 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any 

616) -> str: 

617 """ 

618 Implementation of :class:`unknown_field_to_utcdatetime` for MySQL. 

619 

620 If it's the length of a plain ``DATETIME`` e.g. ``2013-05-30 00:00:00`` 

621 (19), leave it as a ``DATETIME``; otherwise convert ISO -> ``DATETIME``. 

622 """ 

623 x = fetch_processed_single_clause(element, compiler) 

624 converted = isotzdatetime_to_utcdatetime_mysql(element, compiler, **kw) 

625 result = f"IF(LENGTH({x}) = {_MYSQL_DATETIME_LEN}, {x}, {converted})" 

626 # log.debug(result) 

627 return result 

628 

629 

630# noinspection PyUnusedLocal 

631@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLITE) 

632def unknown_field_to_utcdatetime_sqlite( 

633 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any 

634) -> str: 

635 """ 

636 Implementation of :class:`unknown_field_to_utcdatetime` for SQLite. 

637 """ 

638 x = fetch_processed_single_clause(element, compiler) 

639 fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON)) 

640 result = f"STRFTIME({fmt}, {x})" 

641 # log.debug(result) 

642 return result 

643 

644 

645# noinspection PyUnusedLocal 

646@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLSERVER) 

647def unknown_field_to_utcdatetime_sqlserver( 

648 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any 

649) -> str: 

650 """ 

651 Implementation of :class:`unknown_field_to_utcdatetime` for SQL Server. 

652 

653 We should cope also with the possibility of a ``DATETIME2`` field, not just 

654 ``DATETIME``. It seems consistent that ``LEN(DATETIME2) = 27``, with 

655 precision tenth of a microsecond, e.g. ``2001-01-31 21:30:49.1234567`` 

656 (27). 

657 

658 So, if it looks like a ``DATETIME`` or a ``DATETIME2``, then we leave it 

659 alone; otherwise we put it through our ISO-to-datetime function. 

660 

661 Importantly, note that neither ``_SQLSERVER_DATETIME_LEN`` nor 

662 ``_SQLSERVER_DATETIME2_LEN`` are the length of any of our ISO strings. 

663 """ 

664 x = fetch_processed_single_clause(element, compiler) 

665 # https://stackoverflow.com/questions/5487892/sql-server-case-when-or-then-else-end-the-or-is-not-supported # noqa 

666 converted = isotzdatetime_to_utcdatetime_sqlserver(element, compiler, **kw) 

667 result = ( 

668 f"CASE WHEN LEN({x}) IN " 

669 f"({_SQLSERVER_DATETIME_LEN}, {_SQLSERVER_DATETIME2_LEN}) THEN {x} " 

670 f"ELSE {converted} " 

671 f"END" 

672 ) 

673 # log.debug(result) 

674 return result 

675 

676 

677# ============================================================================= 

678# Custom date/time field as ISO-8601 text including timezone, using 

679# pendulum.DateTime on the Python side. 

680# ============================================================================= 

681 

682 

683class PendulumDateTimeAsIsoTextColType(TypeDecorator): 

684 """ 

685 Stores date/time values as ISO-8601, in a specific format. 

686 Uses Pendulum on the Python side. 

687 """ 

688 

689 impl = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN) 

690 # ... underlying SQL type 

691 

692 cache_ok = False 

693 

694 _coltype_name = "PendulumDateTimeAsIsoTextColType" 

695 

696 @property 

697 def python_type(self) -> type: 

698 """ 

699 The Python type of the object. 

700 """ 

701 return Pendulum 

702 

703 @staticmethod 

704 def pendulum_to_isostring(x: PotentialDatetimeType) -> Optional[str]: 

705 """ 

706 From a Python datetime to an ISO-formatted string in our particular 

707 format. 

708 """ 

709 # https://docs.python.org/3.4/library/datetime.html#strftime-strptime-behavior # noqa 

710 x = coerce_to_pendulum(x) 

711 try: 

712 mainpart = x.strftime( 

713 "%Y-%m-%dT%H:%M:%S.%f" 

714 ) # microsecond accuracy 

715 timezone = x.strftime("%z") # won't have the colon in 

716 return mainpart + timezone[:-2] + ":" + timezone[-2:] 

717 except AttributeError: 

718 return None 

719 

720 @staticmethod 

721 def isostring_to_pendulum(x: Optional[str]) -> Optional[Pendulum]: 

722 """ 

723 From an ISO-formatted string to a Python Pendulum, with timezone. 

724 """ 

725 try: 

726 return coerce_to_pendulum(x) 

727 except (ParserError, ValueError): 

728 log.warning("Bad ISO date/time string: {!r}", x) 

729 return None 

730 

731 def process_bind_param( 

732 self, value: Optional[Pendulum], dialect: Dialect 

733 ) -> Optional[str]: 

734 """ 

735 Convert parameters on the way from Python to the database. 

736 """ 

737 retval = self.pendulum_to_isostring(value) 

738 if DEBUG_DATETIME_AS_ISO_TEXT: 

739 log.debug( 

740 "{}.process_bind_param(" 

741 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

742 self._coltype_name, 

743 self, 

744 value, 

745 dialect, 

746 retval, 

747 ) 

748 return retval 

749 

750 def process_literal_param( 

751 self, value: Optional[Pendulum], dialect: Dialect 

752 ) -> Optional[str]: 

753 """ 

754 Convert literals on the way from Python to the database. 

755 """ 

756 retval = self.pendulum_to_isostring(value) 

757 if DEBUG_DATETIME_AS_ISO_TEXT: 

758 log.debug( 

759 "{}.process_literal_param(" 

760 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

761 self._coltype_name, 

762 self, 

763 value, 

764 dialect, 

765 retval, 

766 ) 

767 return retval 

768 

769 def process_result_value( 

770 self, value: Optional[str], dialect: Dialect 

771 ) -> Optional[Pendulum]: 

772 """ 

773 Convert things on the way from the database to Python. 

774 """ 

775 retval = self.isostring_to_pendulum(value) 

776 if DEBUG_DATETIME_AS_ISO_TEXT: 

777 log.debug( 

778 "{}.process_result_value(" 

779 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

780 self._coltype_name, 

781 self, 

782 value, 

783 dialect, 

784 retval, 

785 ) 

786 return retval 

787 

788 # noinspection PyPep8Naming 

789 class comparator_factory(TypeDecorator.Comparator): 

790 """ 

791 Process SQL for when we are comparing our column, in the database, 

792 to something else. 

793 

794 We make this dialect-independent by calling functions like 

795 

796 .. code-block:: none 

797 

798 unknown_field_to_utcdatetime 

799 isotzdatetime_to_utcdatetime 

800 

801 ... which we then specialize for specific dialects. 

802 

803 This function itself does not appear to be able to access any 

804 information about the dialect. 

805 """ 

806 

807 def operate( 

808 self, op: "OperatorType", *other: Any, **kwargs: Any 

809 ) -> "ColumnElement[_CT]": 

810 assert len(other) == 1 

811 assert not kwargs 

812 other = other[0] 

813 try: 

814 processed_other = convert_datetime_to_utc( 

815 coerce_to_pendulum(other) 

816 ) 

817 # - If you try to call a dialect-specialized FunctionElement, 

818 # it processes the clause to "?" (meaning "attach bind 

819 # parameter here"); it's not the value itself. 

820 # - For our SQLite "milliseconds only" comparator problem (see 

821 # above), we can't do very much here without knowing the 

822 # dialect. So we make the SQLite side look like it has 

823 # microseconds by appending "000"... 

824 except (AttributeError, ParserError, TypeError, ValueError): 

825 # OK. At this point, "other" could be a plain DATETIME field, 

826 # or a PendulumDateTimeAsIsoTextColType field (or potentially 

827 # something else that we don't really care about). If it's a 

828 # DATETIME, then we assume it is already in UTC. 

829 processed_other = unknown_field_to_utcdatetime(other) # type: ignore[assignment] # noqa: E501 

830 if DEBUG_DATETIME_AS_ISO_TEXT: 

831 log.debug( 

832 "operate(self={!r}, op={!r}, other={!r})", self, op, other 

833 ) 

834 log.debug("self.expr = {!r}", self.expr) 

835 log.debug("processed_other = {!r}", processed_other) 

836 # traceback.print_stack() 

837 return op(isotzdatetime_to_utcdatetime(self.expr), processed_other) 

838 

839 def reverse_operate( 

840 self, op: "OperatorType", *other: Any, **kwargs: Any 

841 ) -> NoReturn: 

842 assert False, "I don't think this is ever being called" 

843 

844 

845# ============================================================================= 

846# Custom duration field as ISO-8601 text, using pendulum.Duration on the Python 

847# side. 

848# ============================================================================= 

849 

850 

851class PendulumDurationAsIsoTextColType(TypeDecorator): 

852 """ 

853 Stores time durations as ISO-8601, in a specific format. 

854 Uses :class:`pendulum.Duration` on the Python side. 

855 """ 

856 

857 impl = String(length=StringLengths.ISO8601_DURATION_STRING_MAX_LEN) 

858 # ... underlying SQL type 

859 

860 cache_ok = False 

861 

862 _coltype_name = "PendulumDurationAsIsoTextColType" 

863 

864 @property 

865 def python_type(self) -> type: 

866 """ 

867 The Python type of the object. 

868 """ 

869 return Duration 

870 

871 @staticmethod 

872 def pendulum_duration_to_isostring(x: Optional[Duration]) -> Optional[str]: 

873 """ 

874 From a :class:`pendulum.Duration` (or ``None``) an ISO-formatted string 

875 in our particular format (or ``NULL``). 

876 """ 

877 if x is None: 

878 return None 

879 return duration_to_iso( 

880 x, permit_years_months=True, minus_sign_at_front=True 

881 ) 

882 

883 @staticmethod 

884 def isostring_to_pendulum_duration(x: Optional[str]) -> Optional[Duration]: 

885 """ 

886 From an ISO-formatted string to a Python Pendulum, with timezone. 

887 """ 

888 if not x: # None (NULL) or blank string 

889 return None 

890 try: 

891 return duration_from_iso(x) 

892 except (ISO8601Error, ValueError): 

893 log.warning("Bad ISO duration string: {!r}", x) 

894 return None 

895 

896 def process_bind_param( 

897 self, value: Optional[Duration], dialect: Dialect 

898 ) -> Optional[str]: 

899 """ 

900 Convert parameters on the way from Python to the database. 

901 """ 

902 retval = self.pendulum_duration_to_isostring(value) 

903 if DEBUG_DURATION_AS_ISO_TEXT: 

904 log.debug( 

905 "{}.process_bind_param(" 

906 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

907 self._coltype_name, 

908 self, 

909 value, 

910 dialect, 

911 retval, 

912 ) 

913 return retval 

914 

915 def process_literal_param( 

916 self, value: Optional[Duration], dialect: Dialect 

917 ) -> Optional[str]: 

918 """ 

919 Convert literals on the way from Python to the database. 

920 """ 

921 retval = self.pendulum_duration_to_isostring(value) 

922 if DEBUG_DURATION_AS_ISO_TEXT: 

923 log.debug( 

924 "{}.process_literal_param(" 

925 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

926 self._coltype_name, 

927 self, 

928 value, 

929 dialect, 

930 retval, 

931 ) 

932 return retval 

933 

934 def process_result_value( 

935 self, value: Optional[str], dialect: Dialect 

936 ) -> Optional[Duration]: 

937 """ 

938 Convert things on the way from the database to Python. 

939 """ 

940 retval = self.isostring_to_pendulum_duration(value) 

941 if DEBUG_DURATION_AS_ISO_TEXT: 

942 log.debug( 

943 "{}.process_result_value(" 

944 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

945 self._coltype_name, 

946 self, 

947 value, 

948 dialect, 

949 retval, 

950 ) 

951 return retval 

952 

953 # No comparator_factory; we do not use SQL to compare ISO durations. 

954 

955 

956# ============================================================================= 

957# Semantic version column type 

958# ============================================================================= 

959 

960 

961class SemanticVersionColType(TypeDecorator): 

962 """ 

963 Stores semantic versions in the database. 

964 Uses :class:`semantic_version.Version` on the Python side. 

965 """ 

966 

967 impl = String(length=147) # https://github.com/mojombo/semver/issues/79 

968 

969 cache_ok = False 

970 

971 _coltype_name = "SemanticVersionColType" 

972 

973 @property 

974 def python_type(self) -> type: 

975 """ 

976 The Python type of the object. 

977 """ 

978 return Version 

979 

980 def process_bind_param( 

981 self, value: Optional[Version], dialect: Dialect 

982 ) -> Optional[str]: 

983 """ 

984 Convert parameters on the way from Python to the database. 

985 """ 

986 retval = str(value) if value is not None else None 

987 if DEBUG_SEMANTIC_VERSION: 

988 log.debug( 

989 "{}.process_bind_param(" 

990 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

991 self._coltype_name, 

992 self, 

993 value, 

994 dialect, 

995 retval, 

996 ) 

997 return retval 

998 

999 def process_literal_param( 

1000 self, value: Optional[Version], dialect: Dialect 

1001 ) -> Optional[str]: 

1002 """ 

1003 Convert literals on the way from Python to the database. 

1004 """ 

1005 retval = str(value) if value is not None else None 

1006 if DEBUG_SEMANTIC_VERSION: 

1007 log.debug( 

1008 "{}.process_literal_param(" 

1009 "self={!r}, value={!r}, dialect={!r}) -> !r", 

1010 self._coltype_name, 

1011 self, 

1012 value, 

1013 dialect, 

1014 retval, 

1015 ) 

1016 return retval 

1017 

1018 def process_result_value( 

1019 self, value: Optional[str], dialect: Dialect 

1020 ) -> Optional[Version]: 

1021 """ 

1022 Convert things on the way from the database to Python. 

1023 """ 

1024 if value is None: 

1025 retval = None 

1026 else: 

1027 # Here we do some slightly fancier conversion to deal with all 

1028 # sorts of potential rubbish coming in, so we get a properly 

1029 # ordered Version out: 

1030 retval = make_version(value) 

1031 if DEBUG_SEMANTIC_VERSION: 

1032 log.debug( 

1033 "{}.process_result_value(" 

1034 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

1035 self._coltype_name, 

1036 self, 

1037 value, 

1038 dialect, 

1039 retval, 

1040 ) 

1041 return retval 

1042 

1043 ''' 

1044 # noinspection PyPep8Naming 

1045 class comparator_factory(TypeDecorator.Comparator): 

1046 """ 

1047 Process SQL for when we are comparing our column, in the database, 

1048 to something else. 

1049 

1050 See https://docs.sqlalchemy.org/en/13/core/type_api.html#sqlalchemy.types.TypeEngine.comparator_factory. 

1051 

1052 .. warning:: 

1053 

1054 I'm not sure this is either (a) correct or (b) used; it may 

1055 produce a string comparison of e.g. ``14.0.0`` versus ``2.0.0``, 

1056 which will be alphabetical and therefore wrong. 

1057 Disabled on 2019-04-28. 

1058 

1059 """ # noqa 

1060 

1061 def operate(self, op, *other, **kwargs): 

1062 assert len(other) == 1 

1063 assert not kwargs 

1064 other = other[0] 

1065 if isinstance(other, Version): 

1066 processed_other = str(Version) 

1067 else: 

1068 processed_other = other 

1069 return op(self.expr, processed_other) 

1070 

1071 def reverse_operate(self, op, *other, **kwargs): 

1072 assert False, "I don't think this is ever being called" 

1073 ''' 

1074 

1075 

1076# ============================================================================= 

1077# IdNumReferenceListColType 

1078# ============================================================================= 

1079 

1080 

1081class IdNumReferenceListColType(TypeDecorator): 

1082 """ 

1083 Stores a list of IdNumReference objects. 

1084 On the database side, uses a comma-separated list of integers. 

1085 """ 

1086 

1087 impl = Text() 

1088 _coltype_name = "IdNumReferenceListColType" 

1089 

1090 @property 

1091 def python_type(self) -> type: 

1092 """ 

1093 The Python type of the object. 

1094 """ 

1095 return list 

1096 

1097 @staticmethod 

1098 def _idnumdef_list_to_dbstr( 

1099 idnumdef_list: Optional[List[IdNumReference]], 

1100 ) -> str: 

1101 """ 

1102 Converts an optional list of 

1103 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference` 

1104 objects to a CSV string suitable for storing in the database. 

1105 """ 

1106 if not idnumdef_list: 

1107 return "" 

1108 elements = [] # type: List[int] 

1109 for idnumdef in idnumdef_list: 

1110 elements.append(idnumdef.which_idnum) 

1111 elements.append(idnumdef.idnum_value) 

1112 return ",".join(str(x) for x in elements) 

1113 

1114 @staticmethod 

1115 def _dbstr_to_idnumdef_list(dbstr: Optional[str]) -> List[IdNumReference]: 

1116 """ 

1117 Converts a CSV string (from the database) to a list of 

1118 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference` 

1119 objects. 

1120 """ 

1121 idnumdef_list = [] # type: List[IdNumReference] 

1122 try: 

1123 intlist = [int(numstr) for numstr in dbstr.split(",")] 

1124 except (AttributeError, TypeError, ValueError): 

1125 return [] 

1126 length = len(intlist) 

1127 if length == 0 or length % 2 != 0: # enforce pairs 

1128 return [] 

1129 for which_idnum, idnum_value in chunks(intlist, n=2): 

1130 if which_idnum < 0 or idnum_value < 0: # enforce positive integers 

1131 return [] 

1132 idnumdef_list.append( 

1133 IdNumReference( 

1134 which_idnum=which_idnum, idnum_value=idnum_value 

1135 ) 

1136 ) 

1137 return idnumdef_list 

1138 

1139 def process_bind_param( 

1140 self, value: Optional[List[IdNumReference]], dialect: Dialect 

1141 ) -> str: 

1142 """ 

1143 Convert parameters on the way from Python to the database. 

1144 """ 

1145 retval = self._idnumdef_list_to_dbstr(value) 

1146 if DEBUG_IDNUMDEF_LIST: 

1147 log.debug( 

1148 "{}.process_bind_param(" 

1149 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

1150 self._coltype_name, 

1151 self, 

1152 value, 

1153 dialect, 

1154 retval, 

1155 ) 

1156 return retval 

1157 

1158 def process_literal_param( 

1159 self, value: Optional[List[IdNumReference]], dialect: Dialect 

1160 ) -> str: 

1161 """ 

1162 Convert literals on the way from Python to the database. 

1163 """ 

1164 retval = self._idnumdef_list_to_dbstr(value) 

1165 if DEBUG_IDNUMDEF_LIST: 

1166 log.debug( 

1167 "{}.process_literal_param(" 

1168 "self={!r}, value={!r}, dialect={!r}) -> !r", 

1169 self._coltype_name, 

1170 self, 

1171 value, 

1172 dialect, 

1173 retval, 

1174 ) 

1175 return retval 

1176 

1177 def process_result_value( 

1178 self, value: Optional[str], dialect: Dialect 

1179 ) -> List[IdNumReference]: 

1180 """ 

1181 Convert things on the way from the database to Python. 

1182 """ 

1183 retval = self._dbstr_to_idnumdef_list(value) 

1184 if DEBUG_IDNUMDEF_LIST: 

1185 log.debug( 

1186 "{}.process_result_value(" 

1187 "self={!r}, value={!r}, dialect={!r}) -> {!r}", 

1188 self._coltype_name, 

1189 self, 

1190 value, 

1191 dialect, 

1192 retval, 

1193 ) 

1194 return retval 

1195 

1196 

1197# ============================================================================= 

1198# UUID column type 

1199# ============================================================================= 

1200 

1201 

1202class UuidColType(TypeDecorator): 

1203 # Based on: 

1204 # https://docs.sqlalchemy.org/en/13/core/custom_types.html#backend-agnostic-guid-type # noqa: E501 

1205 # which will use postgresql UUID if relevant, not doing that here 

1206 

1207 impl = CHAR(32) 

1208 

1209 cache_ok = False 

1210 

1211 @property 

1212 def python_type(self) -> type: 

1213 return str 

1214 

1215 def process_bind_param( 

1216 self, value: uuid.UUID, dialect: Dialect 

1217 ) -> Optional[str]: 

1218 """ 

1219 Convert parameters on the way from Python to the database. 

1220 """ 

1221 if value is None: 

1222 return None 

1223 

1224 return "%.32x" % value.int 

1225 

1226 def process_result_value( 

1227 self, value: Optional[str], dialect: Dialect 

1228 ) -> Optional[uuid.UUID]: 

1229 """ 

1230 Convert things on the way from the database to Python. 

1231 """ 

1232 if value is None: 

1233 return None 

1234 

1235 return uuid.UUID(value) 

1236 

1237 

1238# ============================================================================= 

1239# JSON column type 

1240# ============================================================================= 

1241 

1242 

1243class JsonColType(TypeDecorator): 

1244 # Unlike 

1245 # https://docs.sqlalchemy.org/en/13/core/type_basics.html#sqlalchemy.types.JSON 

1246 # does not use vendor-specific JSON type 

1247 impl = UnicodeText 

1248 

1249 cache_ok = False 

1250 

1251 @property 

1252 def python_type(self) -> type: 

1253 return str 

1254 

1255 def process_bind_param( 

1256 self, value: Any, dialect: Dialect 

1257 ) -> Optional[str]: 

1258 """ 

1259 Convert parameters on the way from Python to the database. 

1260 """ 

1261 if value is None: 

1262 return None 

1263 

1264 return json.dumps(value) 

1265 

1266 def process_result_value(self, value: str, dialect: Dialect) -> Any: 

1267 """ 

1268 Convert things on the way from the database to Python. 

1269 """ 

1270 if value is None: 

1271 return None 

1272 

1273 return json.loads(value) 

1274 

1275 

1276# ============================================================================= 

1277# Phone number column type 

1278# ============================================================================= 

1279 

1280 

1281class PhoneNumberColType(TypeDecorator): 

1282 impl = Unicode(length=StringLengths.PHONE_NUMBER_MAX_LEN) 

1283 

1284 cache_ok = False 

1285 

1286 @property 

1287 def python_type(self) -> type: 

1288 return str 

1289 

1290 def process_bind_param( 

1291 self, value: Any, dialect: Dialect 

1292 ) -> Optional[str]: 

1293 """ 

1294 Convert parameters on the way from Python to the database. 

1295 """ 

1296 if value is None: 

1297 return None 

1298 

1299 return phonenumbers.format_number( 

1300 value, phonenumbers.PhoneNumberFormat.E164 

1301 ) 

1302 

1303 def process_result_value(self, value: str, dialect: Dialect) -> Any: 

1304 """ 

1305 Convert things on the way from the database to Python. 

1306 """ 

1307 if not value: 

1308 return None 

1309 

1310 # Should be stored as E164 so no need to pass a region 

1311 return phonenumbers.parse(value, None) 

1312 

1313 

1314# ============================================================================= 

1315# PermittedValueChecker: used by camcops_column 

1316# ============================================================================= 

1317 

1318 

1319class PermittedValueChecker(object): 

1320 """ 

1321 Represents permitted values (in columns belonging to CamCOPS tasks), and 

1322 checks a value against them. 

1323 """ 

1324 

1325 def __init__( 

1326 self, 

1327 not_null: bool = False, 

1328 minimum: Union[int, float] = None, 

1329 maximum: Union[int, float] = None, 

1330 permitted_values: Sequence[Any] = None, 

1331 ) -> None: 

1332 """ 

1333 Args: 

1334 not_null: must the value not be NULL? 

1335 minimum: if specified, a numeric minimum value 

1336 maximum: if specified, a numeric maximum value 

1337 permitted_values: if specified, a list of permitted values 

1338 """ 

1339 self.not_null = not_null 

1340 self.minimum = minimum 

1341 self.maximum = maximum 

1342 self.permitted_values = permitted_values 

1343 

1344 def is_ok(self, value: Any) -> bool: 

1345 """ 

1346 Does the value pass our tests? 

1347 """ 

1348 if value is None: 

1349 return not self.not_null 

1350 # If not_null is True, then the value is not OK; return False. 

1351 # If not_null is False, then a null value passes all other tests. 

1352 if ( 

1353 self.permitted_values is not None 

1354 and value not in self.permitted_values 

1355 ): 

1356 return False 

1357 if self.minimum is not None and value < self.minimum: 

1358 return False 

1359 if self.maximum is not None and value > self.maximum: 

1360 return False 

1361 return True 

1362 

1363 def failure_msg(self, value: Any) -> str: 

1364 """ 

1365 Why does the value not pass our tests? 

1366 """ 

1367 if value is None: 

1368 if self.not_null: 

1369 return "value is None and NULL values are not permitted" 

1370 else: 

1371 return "" # value is OK 

1372 if ( 

1373 self.permitted_values is not None 

1374 and value not in self.permitted_values 

1375 ): 

1376 return ( 

1377 f"value {value!r} not in permitted values " 

1378 f"{self.permitted_values!r}" 

1379 ) 

1380 if self.minimum is not None and value < self.minimum: 

1381 return f"value {value!r} less than minimum of {self.minimum!r}" 

1382 if self.maximum is not None and value > self.maximum: 

1383 return f"value {value!r} more than maximum of {self.maximum!r}" 

1384 return "" 

1385 

1386 def __repr__(self) -> str: 

1387 return auto_repr(self) 

1388 

1389 def permitted_values_inc_minmax(self) -> Tuple: 

1390 """ 

1391 Returns permitted values, either specified directly or via a 

1392 minimum/maximum. 

1393 """ 

1394 if self.permitted_values: 

1395 return tuple(self.permitted_values) 

1396 # Take a punt that integer minima/maxima mean that only integers are 

1397 # permitted... 

1398 if isinstance(self.minimum, int) and isinstance(self.maximum, int): 

1399 return tuple(range(self.minimum, self.maximum + 1)) 

1400 return () 

1401 

1402 def permitted_values_csv(self) -> str: 

1403 """ 

1404 Returns a CSV representation of the permitted values. 

1405 

1406 Primarily used for CRIS data dictionaries. 

1407 """ 

1408 return ",".join(str(x) for x in self.permitted_values_inc_minmax()) 

1409 

1410 

1411# Specific instances, to reduce object duplication and magic numbers: 

1412 

1413MIN_ZERO_CHECKER = PermittedValueChecker(minimum=0) 

1414 

1415BIT_CHECKER = PermittedValueChecker(permitted_values=PV.BIT) 

1416ZERO_TO_ONE_CHECKER = PermittedValueChecker(minimum=0, maximum=1) 

1417ZERO_TO_TWO_CHECKER = PermittedValueChecker(minimum=0, maximum=2) 

1418ZERO_TO_THREE_CHECKER = PermittedValueChecker(minimum=0, maximum=3) 

1419ZERO_TO_FOUR_CHECKER = PermittedValueChecker(minimum=0, maximum=4) 

1420ZERO_TO_FIVE_CHECKER = PermittedValueChecker(minimum=0, maximum=5) 

1421ZERO_TO_SIX_CHECKER = PermittedValueChecker(minimum=0, maximum=6) 

1422ZERO_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=0, maximum=7) 

1423ZERO_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=0, maximum=8) 

1424ZERO_TO_NINE_CHECKER = PermittedValueChecker(minimum=0, maximum=9) 

1425ZERO_TO_10_CHECKER = PermittedValueChecker(minimum=0, maximum=10) 

1426ZERO_TO_100_CHECKER = PermittedValueChecker(minimum=0, maximum=100) 

1427 

1428ONE_TO_TWO_CHECKER = PermittedValueChecker(minimum=1, maximum=2) 

1429ONE_TO_THREE_CHECKER = PermittedValueChecker(minimum=1, maximum=3) 

1430ONE_TO_FOUR_CHECKER = PermittedValueChecker(minimum=1, maximum=4) 

1431ONE_TO_FIVE_CHECKER = PermittedValueChecker(minimum=1, maximum=5) 

1432ONE_TO_SIX_CHECKER = PermittedValueChecker(minimum=1, maximum=6) 

1433ONE_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=1, maximum=7) 

1434ONE_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=1, maximum=8) 

1435ONE_TO_NINE_CHECKER = PermittedValueChecker(minimum=1, maximum=9) 

1436# ONE_TO_10_CHECKER = PermittedValueChecker(minimum=1, maximum=10) 

1437 

1438 

1439# ============================================================================= 

1440# camcops_column: provides extra functions over Column. 

1441# ============================================================================= 

1442 

1443# Column attributes 

1444COLATTR_BLOB_RELATIONSHIP_ATTR_NAME = "blob_relationship_attr_name" 

1445COLATTR_EXEMPT_FROM_ANONYMISATION = "exempt_from_anonymisation" 

1446COLATTR_IDENTIFIES_PATIENT = "identifies_patient" 

1447COLATTR_INCLUDE_IN_ANON_STAGING_DB = "include_in_anon_staging_db" 

1448COLATTR_IS_BLOB_ID_FIELD = "is_blob_id_field" 

1449COLATTR_IS_CAMCOPS_COLUMN = "is_camcops_column" 

1450COLATTR_PERMITTED_VALUE_CHECKER = "permitted_value_checker" 

1451 

1452 

1453def camcops_column( 

1454 *args: Any, 

1455 include_in_anon_staging_db: bool = False, 

1456 exempt_from_anonymisation: bool = False, 

1457 identifies_patient: bool = False, 

1458 is_blob_id_field: bool = False, 

1459 blob_relationship_attr_name: str = "", 

1460 permitted_value_checker: PermittedValueChecker = None, 

1461 **kwargs: Any, 

1462) -> Column[Any]: 

1463 """ 

1464 Args: 

1465 *args: 

1466 Arguments to the :class:`Column` constructor. 

1467 include_in_anon_staging_db: 

1468 Ensure this is marked for inclusion in data dictionaries for an 

1469 anonymisation staging database. 

1470 exempt_from_anonymisation: 

1471 If true: though this field might be text, it is guaranteed not 

1472 to contain identifiers (e.g. it might contain only predefined 

1473 disease severity descriptions) and does not require 

1474 anonymisation. 

1475 identifies_patient: 

1476 If true: contains a patient identifier (e.g. name). 

1477 is_blob_id_field: 

1478 If true: this field contains a reference (client FK) to the 

1479 BLOB table. 

1480 blob_relationship_attr_name: 

1481 For BLOB ID fields: the name of the associated relationship 

1482 attribute (which, when accessed, yields the BLOB itself) in 

1483 the owning class/object. 

1484 permitted_value_checker: 

1485 If specified, a :class:`PermittedValueChecker` that allows 

1486 soft constraints to be specified on the field's contents. (That 

1487 is, no constraints are specified at the database level, but we 

1488 can moan if incorrect data are present.) 

1489 **kwargs: 

1490 Arguments to the :class:`Column` constructor. 

1491 """ 

1492 if is_blob_id_field: 

1493 assert blob_relationship_attr_name, ( 

1494 "If specifying a BLOB ID field, must give the attribute name " 

1495 "of the relationship too" 

1496 ) 

1497 info = { 

1498 COLATTR_IS_CAMCOPS_COLUMN: True, 

1499 COLATTR_INCLUDE_IN_ANON_STAGING_DB: include_in_anon_staging_db, 

1500 COLATTR_EXEMPT_FROM_ANONYMISATION: exempt_from_anonymisation, 

1501 COLATTR_IDENTIFIES_PATIENT: identifies_patient, 

1502 COLATTR_IS_BLOB_ID_FIELD: is_blob_id_field, 

1503 COLATTR_BLOB_RELATIONSHIP_ATTR_NAME: blob_relationship_attr_name, 

1504 COLATTR_PERMITTED_VALUE_CHECKER: permitted_value_checker, 

1505 } 

1506 return Column(*args, info=info, **kwargs) 

1507 

1508 

1509def mapped_camcops_column( # type: ignore[no-untyped-def] 

1510 *args, 

1511 include_in_anon_staging_db: bool = False, 

1512 exempt_from_anonymisation: bool = False, 

1513 identifies_patient: bool = False, 

1514 is_blob_id_field: bool = False, 

1515 blob_relationship_attr_name: str = "", 

1516 permitted_value_checker: PermittedValueChecker = None, 

1517 **kwargs, 

1518) -> MappedColumn[Any]: 

1519 """ 

1520 As :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` but 

1521 returns a python typing-compatible MappedColumn. 

1522 

1523 Args: 

1524 *args: 

1525 Arguments to the :class:`Column` constructor. 

1526 include_in_anon_staging_db: 

1527 Ensure this is marked for inclusion in data dictionaries for an 

1528 anonymisation staging database. 

1529 exempt_from_anonymisation: 

1530 If true: though this field might be text, it is guaranteed not 

1531 to contain identifiers (e.g. it might contain only predefined 

1532 disease severity descriptions) and does not require 

1533 anonymisation. 

1534 identifies_patient: 

1535 If true: contains a patient identifier (e.g. name). 

1536 is_blob_id_field: 

1537 If true: this field contains a reference (client FK) to the 

1538 BLOB table. 

1539 blob_relationship_attr_name: 

1540 For BLOB ID fields: the name of the associated relationship 

1541 attribute (which, when accessed, yields the BLOB itself) in 

1542 the owning class/object. 

1543 permitted_value_checker: 

1544 If specified, a :class:`PermittedValueChecker` that allows 

1545 soft constraints to be specified on the field's contents. (That 

1546 is, no constraints are specified at the database level, but we 

1547 can moan if incorrect data are present.) 

1548 **kwargs: 

1549 Arguments to the :class:`Column` constructor. 

1550 """ 

1551 if is_blob_id_field: 

1552 assert blob_relationship_attr_name, ( 

1553 "If specifying a BLOB ID field, must give the attribute name " 

1554 "of the relationship too" 

1555 ) 

1556 info = { 

1557 COLATTR_IS_CAMCOPS_COLUMN: True, 

1558 COLATTR_INCLUDE_IN_ANON_STAGING_DB: include_in_anon_staging_db, 

1559 COLATTR_EXEMPT_FROM_ANONYMISATION: exempt_from_anonymisation, 

1560 COLATTR_IDENTIFIES_PATIENT: identifies_patient, 

1561 COLATTR_IS_BLOB_ID_FIELD: is_blob_id_field, 

1562 COLATTR_BLOB_RELATIONSHIP_ATTR_NAME: blob_relationship_attr_name, 

1563 COLATTR_PERMITTED_VALUE_CHECKER: permitted_value_checker, 

1564 } 

1565 return mapped_column(*args, info=info, **kwargs) 

1566 

1567 

1568# ============================================================================= 

1569# Operate on Column/MappedColumn properties 

1570# ============================================================================= 

1571 

1572 

1573def gen_columns_matching_attrnames( # type: ignore[no-untyped-def] 

1574 obj, attrnames: List[str] 

1575) -> Generator[Tuple[str, Column], None, None]: 

1576 """ 

1577 Find columns of an SQLAlchemy ORM object whose attribute names match a 

1578 list. 

1579 

1580 Args: 

1581 obj: SQLAlchemy ORM object to inspect 

1582 attrnames: attribute names 

1583 

1584 Yields: 

1585 ``attrname, column`` tuples 

1586 

1587 """ 

1588 for attrname, column in gen_columns(obj): 

1589 if attrname in attrnames: 

1590 yield attrname, column 

1591 

1592 

1593def gen_camcops_columns( # type: ignore[no-untyped-def] 

1594 obj, 

1595) -> Generator[Tuple[str, Column], None, None]: 

1596 """ 

1597 Finds all columns of an object that are 

1598 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns. 

1599 

1600 Args: 

1601 obj: SQLAlchemy ORM object to inspect 

1602 

1603 Yields: 

1604 ``attrname, column`` tuples 

1605 """ 

1606 for attrname, column in gen_columns(obj): 

1607 if column.info.get(COLATTR_IS_CAMCOPS_COLUMN, False): 

1608 yield attrname, column 

1609 

1610 

1611def gen_camcops_blob_columns( # type: ignore[no-untyped-def] 

1612 obj, 

1613) -> Generator[Tuple[str, Column], None, None]: 

1614 """ 

1615 Finds all columns of an object that are 

1616 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns 

1617 referencing the BLOB table. 

1618 

1619 Args: 

1620 obj: SQLAlchemy ORM object to inspect 

1621 

1622 Yields: 

1623 ``attrname, column`` tuples 

1624 """ 

1625 for attrname, column in gen_camcops_columns(obj): 

1626 if column.info.get(COLATTR_IS_BLOB_ID_FIELD, False): 

1627 if attrname != column.name: 

1628 log.warning( 

1629 "BLOB field where attribute name {!r} != SQL " 

1630 "column name {!r}", 

1631 attrname, 

1632 column.name, 

1633 ) 

1634 yield attrname, column 

1635 

1636 

1637def get_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] 

1638 """ 

1639 Get a list of column attribute names from an SQLAlchemy ORM object. 

1640 """ 

1641 return [attrname for attrname, _ in gen_columns(obj)] 

1642 

1643 

1644def get_camcops_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501 

1645 """ 

1646 Get a list of 

1647 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` column 

1648 attribute names from an SQLAlchemy ORM object. 

1649 """ 

1650 return [attrname for attrname, _ in gen_camcops_columns(obj)] 

1651 

1652 

1653def get_camcops_blob_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501 

1654 """ 

1655 Get a list of 

1656 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` BLOB 

1657 column attribute names from an SQLAlchemy ORM object. 

1658 """ 

1659 return [attrname for attrname, _ in gen_camcops_blob_columns(obj)] 

1660 

1661 

1662def permitted_value_failure_msgs(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501 

1663 """ 

1664 Checks a SQLAlchemy ORM object instance against its permitted value checks 

1665 (via its :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` 

1666 columns), if it has any. 

1667 

1668 Returns a list of failure messages (empty list means all OK). 

1669 

1670 If you just want to know whether it passes, a quicker way is via 

1671 :func:`permitted_values_ok`. 

1672 """ 

1673 failure_msgs = [] 

1674 for attrname, camcops_column_ in gen_camcops_columns(obj): 

1675 pv_checker = camcops_column_.info.get( 

1676 COLATTR_PERMITTED_VALUE_CHECKER 

1677 ) # type: Optional[PermittedValueChecker] 

1678 if pv_checker is None: 

1679 continue 

1680 value = getattr(obj, attrname) 

1681 failure_msg = pv_checker.failure_msg(value) 

1682 if failure_msg: 

1683 failure_msgs.append(f"Invalid value for {attrname}: {failure_msg}") 

1684 return failure_msgs 

1685 

1686 

1687def permitted_values_ok(obj) -> bool: # type: ignore[no-untyped-def] 

1688 """ 

1689 Checks whether an instance passes its permitted value checks, if it has 

1690 any. 

1691 

1692 If you want to know why it failed, see 

1693 :func:`permitted_value_failure_msgs`. 

1694 """ 

1695 for attrname, camcops_column_ in gen_camcops_columns(obj): 

1696 pv_checker = camcops_column_.info.get( 

1697 COLATTR_PERMITTED_VALUE_CHECKER 

1698 ) # type: Optional[PermittedValueChecker] 

1699 if pv_checker is None: 

1700 continue 

1701 value = getattr(obj, attrname) 

1702 if not pv_checker.is_ok(value): 

1703 return False 

1704 return True 

1705 

1706 

1707def gen_ancillary_relationships( # type: ignore[no-untyped-def] 

1708 obj, 

1709) -> Generator[ 

1710 Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]], 

1711 None, 

1712 None, 

1713]: 

1714 """ 

1715 For an SQLAlchemy ORM object, yields tuples of ``attrname, 

1716 relationship_property, related_class`` for all relationships that are 

1717 marked as a CamCOPS ancillary relationship. 

1718 """ 

1719 for attrname, rel_prop, related_class in gen_relationships(obj): 

1720 if rel_prop.info.get(RelationshipInfo.IS_ANCILLARY, None) is True: 

1721 yield attrname, rel_prop, related_class 

1722 

1723 

1724def gen_blob_relationships( # type: ignore[no-untyped-def] 

1725 obj, 

1726) -> Generator[ 

1727 Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]], 

1728 None, 

1729 None, 

1730]: 

1731 """ 

1732 For an SQLAlchemy ORM object, yields tuples of ``attrname, 

1733 relationship_property, related_class`` for all relationships that are 

1734 marked as a CamCOPS BLOB relationship. 

1735 """ 

1736 for attrname, rel_prop, related_class in gen_relationships(obj): 

1737 if rel_prop.info.get(RelationshipInfo.IS_BLOB, None) is True: 

1738 yield attrname, rel_prop, related_class 

1739 

1740 

1741# ============================================================================= 

1742# Specializations of camcops_column to save typing 

1743# ============================================================================= 

1744 

1745 

1746def bool_column(name: str, *args: Any, **kwargs: Any) -> Column[bool]: 

1747 type_arg = _get_bool_column_args(name, kwargs) 

1748 

1749 return camcops_column(name, type_arg, *args, **kwargs) 

1750 

1751 

1752def mapped_bool_column( 

1753 name: str, *args: Any, **kwargs: Any 

1754) -> MappedColumn[bool]: 

1755 type_arg = _get_bool_column_args(name, kwargs) 

1756 

1757 return mapped_camcops_column(name, type_arg, *args, **kwargs) 

1758 

1759 

1760def _get_bool_column_args(name: str, kwargs: dict[str, Any]) -> Boolean: 

1761 constraint_name = kwargs.pop( 

1762 "constraint_name", None 

1763 ) # type: Optional[str] 

1764 if constraint_name: 

1765 constraint_name_conv = conv(constraint_name) 

1766 # ... see help for ``conv`` 

1767 else: 

1768 constraint_name_conv = None 

1769 type_arg = Boolean(name=constraint_name_conv) 

1770 # The "name" parameter to Boolean() specifies the name of the 

1771 # (0, 1) constraint. 

1772 kwargs[COLATTR_PERMITTED_VALUE_CHECKER] = BIT_CHECKER 

1773 

1774 if not constraint_name and len(name) >= LONG_COLUMN_NAME_WARNING_LIMIT: 

1775 log.warning( 

1776 "bool_column with long column name and no constraint name: {!r}", 

1777 name, 

1778 ) 

1779 

1780 return type_arg