Coverage for cc_modules/cc_sqla_coltypes.py : 61%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/cc_modules/cc_sqla_coltypes.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27**SQLAlchemy column types used by CamCOPS.**
29Note these built-in SQLAlchemy types
30(https://docs.sqlalchemy.org/en/latest/core/type_basics.html#generic-types):
32 =============== ===========================================================
33 SQLAlchemy type Comment
34 =============== ===========================================================
35 BigInteger MySQL: -9,223,372,036,854,775,808 to
36 9,223,372,036,854,775,807 (64-bit)
37 (compare NHS number: up to 9,999,999,999)
38 Boolean
39 Date
40 DateTime
41 Enum
42 Float
43 Integer MySQL: -2,147,483,648 to 2,147,483,647 (32-bit)
44 Interval For ``datetime.timedelta``
45 LargeBinary Under MySQL, maps to ``BLOB``
46 MatchType For the return type of the ``MATCH`` operator
47 Numeric For fixed-precision numbers like ``NUMERIC`` or ``DECIMAL``
48 PickleType
49 SchemaType
50 SmallInteger
51 String ``VARCHAR``
52 Text Variably sized string type.
53 (Under MySQL, renders as ``TEXT``.)
54 Time
55 Unicode Implies that the underlying column explicitly supports
56 Unicode
57 UnicodeText Variably sized version of Unicode
58 (Under MySQL, renders as ``TEXT`` too.)
59 =============== ===========================================================
61Not supported across all platforms:
63 =============== ===========================================================
64 SQL type Comment
65 =============== ===========================================================
66 BIGINT UNSIGNED MySQL: 0 to 18,446,744,073,709,551,615 (64-bit).
67 Use ``sqlalchemy.dialects.mysql.BIGINT(unsigned=True)``.
68 INT UNSIGNED MySQL: 0 to 4,294,967,295 (32-bit).
69 Use ``sqlalchemy.dialects.mysql.INTEGER(unsigned=True)``.
70 =============== ===========================================================
72Other MySQL sizes:
74 =============== ===========================================================
75 MySQL type Comment
76 =============== ===========================================================
77 TINYBLOB 2^8 bytes = 256 bytes
78 BLOB 2^16 bytes = 64 KiB
79 MEDIUMBLOB 2^24 bytes = 16 MiB
80 LONGBLOB 2^32 bytes = 4 GiB
81 TINYTEXT 255 (2^8 - 1) bytes
82 TEXT 65,535 bytes (2^16 - 1) = 64 KiB
83 MEDIUMTEXT 16,777,215 (2^24 - 1) bytes = 16 MiB
84 LONGTEXT 4,294,967,295 (2^32 - 1) bytes = 4 GiB
85 =============== ===========================================================
87See https://stackoverflow.com/questions/13932750/tinytext-text-mediumtext-and-longtext-maximum-storage-sizes.
89Also notes:
91- Columns may need their character set specified explicitly under MySQL:
92 https://stackoverflow.com/questions/2108824/mysql-incorrect-string-value-error-when-save-unicode-string-in-django
94""" # noqa
96# =============================================================================
97# Imports
98# =============================================================================
100import json
101import logging
102from typing import (Any, Generator, List, Optional, Tuple, Type, TYPE_CHECKING,
103 Union)
104import uuid
106from cardinal_pythonlib.datetimefunc import (
107 coerce_to_pendulum,
108 convert_datetime_to_utc,
109 duration_from_iso,
110 duration_to_iso,
111 PotentialDatetimeType,
112)
113from cardinal_pythonlib.lists import chunks
114from cardinal_pythonlib.logs import (
115 BraceStyleAdapter,
116)
117from cardinal_pythonlib.reprfunc import auto_repr
118from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
119from cardinal_pythonlib.sqlalchemy.orm_inspect import (
120 gen_columns,
121 gen_relationships,
122)
123from cardinal_pythonlib.sqlalchemy.sqlfunc import (
124 fail_unknown_dialect,
125 fetch_processed_single_clause
126)
127from isodate.isoerror import ISO8601Error
128from pendulum import DateTime as Pendulum, Duration
129from pendulum.parsing.exceptions import ParserError
130from semantic_version import Version
131from sqlalchemy import util
132from sqlalchemy.dialects import mysql
133from sqlalchemy.engine.interfaces import Dialect
134from sqlalchemy.ext.compiler import compiles
135from sqlalchemy.orm.relationships import RelationshipProperty
136from sqlalchemy.sql.elements import conv
137from sqlalchemy.sql.expression import text
138from sqlalchemy.sql.functions import FunctionElement
139from sqlalchemy.sql.schema import Column
140from sqlalchemy.sql.sqltypes import (
141 Boolean,
142 CHAR,
143 DateTime,
144 LargeBinary,
145 String,
146 Text,
147 Unicode,
148 UnicodeText,
149)
150from sqlalchemy.sql.type_api import TypeDecorator
152from camcops_server.cc_modules.cc_constants import PV, StringLengths
153from camcops_server.cc_modules.cc_simpleobjects import IdNumReference
154from camcops_server.cc_modules.cc_sqlalchemy import (
155 LONG_COLUMN_NAME_WARNING_LIMIT,
156)
157from camcops_server.cc_modules.cc_version import make_version
159if TYPE_CHECKING:
160 from sqlalchemy.sql.elements import ClauseElement # noqa: F401
161 from sqlalchemy.sql.compiler import SQLCompiler # noqa: F401
162 from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin # noqa: E501,F401
164log = BraceStyleAdapter(logging.getLogger(__name__))
166# =============================================================================
167# Debugging options
168# =============================================================================
170DEBUG_DATETIME_AS_ISO_TEXT = False
171DEBUG_DURATION_AS_ISO_TEXT = False
172DEBUG_IDNUMDEF_LIST = False
173DEBUG_INT_LIST_COLTYPE = False
174DEBUG_SEMANTIC_VERSION = False
175DEBUG_STRING_LIST_COLTYPE = False
177if any([DEBUG_DATETIME_AS_ISO_TEXT,
178 DEBUG_DURATION_AS_ISO_TEXT,
179 DEBUG_SEMANTIC_VERSION,
180 DEBUG_IDNUMDEF_LIST,
181 DEBUG_INT_LIST_COLTYPE,
182 DEBUG_STRING_LIST_COLTYPE]):
183 log.warning("Debugging options enabled!")
185# =============================================================================
186# Constants
187# =============================================================================
190class RelationshipInfo(object):
191 """
192 Used as keys the ``info`` (user-defined) dictionary parameter to SQLAlchemy
193 ``relationship`` calls; see
194 https://docs.sqlalchemy.org/en/latest/orm/relationship_api.html#sqlalchemy.orm.relationship.
195 """ # noqa
196 IS_ANCILLARY = "is_ancillary"
197 IS_BLOB = "is_blob"
200# =============================================================================
201# Simple derivative column types
202# =============================================================================
203# If you insert something too long into a VARCHAR, it just gets truncated.
205AuditSourceColType = String(length=StringLengths.AUDIT_SOURCE_MAX_LEN)
207# BigIntUnsigned = Integer().with_variant(mysql.BIGINT(unsigned=True), 'mysql')
208# ... partly because Alembic breaks on variants (Aug 2017), and partly because
209# it's nonstandard and unnecessary, changed all BigIntUnsigned to
210# BigInteger (2017-08-25).
212CharColType = String(length=1)
213CharsetColType = String(length=StringLengths.CHARSET_MAX_LEN)
214CurrencyColType = Unicode(length=StringLengths.CURRENCY_MAX_LEN)
216DatabaseTitleColType = Unicode(length=StringLengths.DATABASE_TITLE_MAX_LEN)
217DeviceNameColType = String(length=StringLengths.DEVICE_NAME_MAX_LEN)
218DiagnosticCodeColType = String(length=StringLengths.DIAGNOSTIC_CODE_MAX_LEN)
220EmailAddressColType = Unicode(length=StringLengths.EMAIL_ADDRESS_MAX_LEN)
221EraColType = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN)
222ExportRecipientNameColType = String(
223 length=StringLengths.EXPORT_RECIPIENT_NAME_MAX_LEN)
224ExportTransmissionMethodColType = String(
225 length=StringLengths.SENDING_FORMAT_MAX_LEN)
227FilterTextColType = Unicode(length=StringLengths.FILTER_TEXT_MAX_LEN)
228FileSpecColType = Unicode(length=StringLengths.FILESPEC_MAX_LEN)
229FullNameColType = Unicode(length=StringLengths.FULLNAME_MAX_LEN)
231GroupDescriptionColType = Unicode(
232 length=StringLengths.GROUP_DESCRIPTION_MAX_LEN)
233GroupNameColType = Unicode(length=StringLengths.GROUP_NAME_MAX_LEN)
235HashedPasswordColType = String(length=StringLengths.HASHED_PW_MAX_LEN)
236# ... You might think that we must ensure case-SENSITIVE comparison on this
237# field. That would require the option collation='utf8mb4_bin' to String(),
238# for MySQL. However, that is MySQL-specific, and SQLAlchemy currently (Oct
239# 2017) doesn't support database-specific *per-column* collations. SQLite
240# accepts COLLATE commands but chokes on 'utf8mb4_bin'. Now, the hashed
241# password from bcrypt() is case-sensitive. HOWEVER, the important thing is
242# that we always retrieve the string from the database and do a case-sensitive
243# comparison in Python (see calls to is_password_valid()). So the database
244# collation doesn't matter. So we don't set it.
245# See further notes in cc_sqlalchemy.py
246HL7AssigningAuthorityType = String(length=StringLengths.HL7_AA_MAX_LEN)
247HL7IdTypeType = String(length=StringLengths.HL7_ID_TYPE_MAX_LEN)
248HostnameColType = String(length=StringLengths.HOSTNAME_MAX_LEN)
250IdDescriptorColType = Unicode(length=StringLengths.ID_DESCRIPTOR_MAX_LEN)
251IdPolicyColType = String(length=StringLengths.ID_POLICY_MAX_LEN)
252# IntUnsigned = Integer().with_variant(mysql.INTEGER(unsigned=True), 'mysql')
253IPAddressColType = String(length=StringLengths.IP_ADDRESS_MAX_LEN)
254# This is a plain string.
255# See also e.g. http://sqlalchemy-utils.readthedocs.io/en/latest/_modules/sqlalchemy_utils/types/ip_address.html # noqa
257LanguageCodeColType = String(length=StringLengths.LANGUAGE_CODE_MAX_LEN)
259# Large BLOB:
260# https://stackoverflow.com/questions/43791725/sqlalchemy-how-to-make-a-longblob-column-in-mysql # noqa
261# One of these:
262# noinspection PyTypeChecker
263LongBlob = LargeBinary().with_variant(mysql.LONGBLOB, "mysql")
264# LongBlob = LargeBinary(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa
266# noinspection PyTypeChecker
267LongText = UnicodeText().with_variant(mysql.LONGTEXT, "mysql")
268# LongText = UnicodeText(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa
270MimeTypeColType = String(length=StringLengths.MIMETYPE_MAX_LEN)
272PatientNameColType = Unicode(length=StringLengths.PATIENT_NAME_MAX_LEN)
274Rfc2822DateColType = String(length=StringLengths.RFC_2822_DATE_MAX_LEN)
276SessionTokenColType = String(length=StringLengths.SESSION_TOKEN_MAX_LEN)
277SexColType = String(length=1)
278SummaryCategoryColType = String(
279 length=StringLengths.TASK_SUMMARY_TEXT_FIELD_DEFAULT_MAX_LEN)
280# ... pretty generic
282TableNameColType = String(length=StringLengths.TABLENAME_MAX_LEN)
284UrlColType = String(length=StringLengths.URL_MAX_LEN)
285UserNameCamcopsColType = String(length=StringLengths.USERNAME_CAMCOPS_MAX_LEN)
286UserNameExternalColType = String(
287 length=StringLengths.USERNAME_EXTERNAL_MAX_LEN)
290# =============================================================================
291# Helper operations for PendulumDateTimeAsIsoTextColType
292# =============================================================================
293# Database string format is e.g.
294# 2013-07-24T20:04:07.123456+01:00
295# 2013-07-24T20:04:07.123+01:00
296# 0 1 2 3 } position in string; 1-based
297# 12345678901234567890123456789012 }
298#
299# So: rightmost 6 characters are time zone; rest is date/time.
300# leftmost 23 characters are time up to millisecond precision.
301# overall length is typically 29 (milliseconds) or 32 (microseconds)
303_TZ_LEN = 6 # length of the timezone part of the ISO8601 string
304_UTC_TZ_LITERAL = "'+00:00'"
305_SQLITE_DATETIME_FMT_FOR_PYTHON = "'%Y-%m-%d %H:%M:%f'"
307_MYSQL_DATETIME_LEN = 19
308_SQLSERVER_DATETIME_LEN = 19
309_SQLSERVER_DATETIME2_LEN = 27
312# -----------------------------------------------------------------------------
313# isotzdatetime_to_utcdatetime
314# -----------------------------------------------------------------------------
316# noinspection PyPep8Naming
317class isotzdatetime_to_utcdatetime(FunctionElement):
318 """
319 Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`.
321 Creates an SQL expression wrapping a field containing our ISO-8601 text,
322 making a ``DATETIME`` out of it, in the UTC timezone.
324 Implemented for different SQL dialects.
325 """
326 type = DateTime()
327 name = 'isotzdatetime_to_utcdatetime'
330# noinspection PyUnusedLocal
331@compiles(isotzdatetime_to_utcdatetime)
332def isotzdatetime_to_utcdatetime_default(
333 element: "ClauseElement",
334 compiler: "SQLCompiler", **kw) -> None:
335 """
336 Default implementation for :class:`isotzdatetime_to_utcdatetime`: fail.
337 """
338 fail_unknown_dialect(compiler, "perform isotzdatetime_to_utcdatetime")
341# noinspection PyUnusedLocal
342@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.MYSQL)
343def isotzdatetime_to_utcdatetime_mysql(
344 element: "ClauseElement",
345 compiler: "SQLCompiler", **kw) -> str:
346 """
347 Implementation of :class:`isotzdatetime_to_utcdatetime` for MySQL.
349 For format, see
350 https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_date-format
352 Note the use of "%i" for minutes.
354 Things after ``func.`` get passed to the database engine as literal SQL
355 functions; https://docs.sqlalchemy.org/en/latest/core/tutorial.html
356 """ # noqa
357 x = fetch_processed_single_clause(element, compiler)
359 # Let's do this in a clear way:
360 date_time_part = f"LEFT({x}, LENGTH({x}) - {_TZ_LEN})"
361 # ... drop the rightmost 6 chars (the timezone component)
362 fmt = compiler.process(text("'%Y-%m-%dT%H:%i:%S.%f'"))
363 # ... the text() part deals with the necessary escaping of % for the DBAPI
364 the_date_time = f"STR_TO_DATE({date_time_part}, {fmt})"
365 # ... STR_TO_DATE() returns a DATETIME if the string contains both date and
366 # time components.
367 old_timezone = f"RIGHT({x}, {_TZ_LEN})"
368 result_utc = (
369 f"CONVERT_TZ({the_date_time}, {old_timezone}, {_UTC_TZ_LITERAL})"
370 )
372 # log.warning(result_utc)
373 return result_utc
376# noinspection PyUnusedLocal
377@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLITE)
378def isotzdatetime_to_utcdatetime_sqlite(
379 element: "ClauseElement",
380 compiler: "SQLCompiler", **kw) -> str:
381 """
382 Implementation of :class:`isotzdatetime_to_utcdatetime` for SQLite.
384 - https://sqlite.org/lang_corefunc.html#substr
385 - https://sqlite.org/lang_datefunc.html
386 - https://www.sqlite.org/lang_expr.html
388 Get an SQL expression for the timezone adjustment in hours.
389 Note that if a time is 12:00+01:00, that means e.g. midday BST, which
390 is 11:00+00:00 or 11:00 UTC. So you SUBTRACT the displayed timezone from
391 the time, which I've always thought is a bit odd.
393 Ha! Was busy implementing this, but SQLite is magic; if there's a
394 timezone at the end, ``STRFTIME()`` will convert it to UTC automatically!
395 Moreover, the format is the OUTPUT format that a Python datetime will
396 recognize, so no 'T'.
398 The output format is like this: ``2018-06-01 00:00:00.000``. Note that
399 SQLite provides millisecond precision only (in general and via the ``%f``
400 argument to ``STRFTIME``).
402 See also SQLAlchemy's DATETIME support for SQLite:
404 - https://docs.sqlalchemy.org/en/13/dialects/sqlite.html?highlight=sqlite#sqlalchemy.dialects.sqlite.DATETIME
406 ... but that doesn't support timezones, so that doesn't help us.
408 One further problem -- see
409 :class:`camcops_server.tasks.core10.Core10ReportDateRangeTests` -- is that
410 comparisons are done by SQLite as text, so e.g.
412 .. code-block:: sql
414 SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000000'; -- 0, false
415 SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000'; -- 1, true
417 and therefore we need to ensure either that the SQLite side gets translated
418 to 6dp, or the bind param gets translated to 3dp. I don't think we can
419 always have control over the bind parameter. So we append '000' to the
420 SQLite side.
422 """ # noqa
423 x = fetch_processed_single_clause(element, compiler)
424 fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON))
425 result = f"(STRFTIME({fmt}, {x}) || '000')"
426 # log.warning(result)
427 return result
430# noinspection PyUnusedLocal
431@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLSERVER)
432def isotzdatetime_to_utcdatetime_sqlserver(
433 element: "ClauseElement",
434 compiler: "SQLCompiler", **kw) -> str:
435 """
436 Implementation of :class:`isotzdatetime_to_utcdatetime` for SQL Server.
438 **Converting strings to DATETIME values**
440 - ``CAST()``: Part of ANSI SQL.
441 - ``CONVERT()``: Not part of ANSI SQL; has some extra formatting options.
443 Both methods work:
445 .. code-block:: sql
447 SELECT CAST('2001-01-31T21:30:49.123' AS DATETIME) AS via_cast,
448 CONVERT(DATETIME, '2001-01-31T21:30:49.123') AS via_convert;
450 ... fine on SQL Server 2005, with milliseconds in both cases.
451 However, going beyond milliseconds doesn't fail gracefully, it causes an
452 error (e.g. "...21:30.49.123456") both for CAST and CONVERT.
454 The ``DATETIME2`` format accepts greater precision, but requires SQL Server
455 2008 or higher. Then this works:
457 .. code-block:: sql
459 SELECT CAST('2001-01-31T21:30:49.123456' AS DATETIME2) AS via_cast,
460 CONVERT(DATETIME2, '2001-01-31T21:30:49.123456') AS via_convert;
462 So as not to be too optimistic: ``CAST(x AS DATETIME2)`` ignores (silently)
463 any timezone information in the string. So does ``CONVERT(DATETIME2, x, {0
464 or 1})``.
466 **Converting between time zones**
468 NO TIME ZONE SUPPORT in SQL Server 2005.
469 e.g. https://stackoverflow.com/questions/3200827/how-to-convert-timezones-in-sql-server-2005.
471 .. code-block:: none
473 TODATETIMEOFFSET(expression, time_zone):
474 expression: something that evaluates to a DATETIME2 value
475 time_zone: integer minutes, or string hours/minutes e.g. "+13.00"
476 -> produces a DATETIMEOFFSET value
478 Available from SQL Server 2008
479 (https://docs.microsoft.com/en-us/sql/t-sql/functions/todatetimeoffset-transact-sql).
481 .. code-block:: none
483 SWITCHOFFSET
484 -> converts one DATETIMEOFFSET value to another, preserving its UTC
485 time, but changing the displayed (local) time zone.
487 ... however, is that unnecessary? We want a plain ``DATETIME2`` in UTC, and
488 .conversion to UTC is automatically achieved by ``CONVERT(DATETIME2,
489 .some_datetimeoffset, 1)``
491 ... https://stackoverflow.com/questions/4953903/how-can-i-convert-a-sql-server-2008-datetimeoffset-to-a-datetime
493 ... but not by ``CAST(some_datetimeoffset AS DATETIME2)``, and not by
494 ``CONVERT(DATETIME2, some_datetimeoffset, 0)``
496 ... and styles 0 and 1 are the only ones permissible from SQL Server 2012
497 and up (empirically, and documented for the reverse direction at
498 https://docs.microsoft.com/en-us/sql/t-sql/functions/cast-and-convert-transact-sql?view=sql-server-2017)
500 ... this is not properly documented re UTC conversion, as far as I can
501 see. Let's use ``SWITCHOFFSET -> CAST`` to be explicit and clear.
503 ``AT TIME ZONE``: From SQL Server 2016 only.
504 https://docs.microsoft.com/en-us/sql/t-sql/queries/at-time-zone-transact-sql?view=sql-server-2017
506 **Therefore**
508 - We need to require SQL Server 2008 or higher.
509 - Therefore we can use the ``DATETIME2`` type.
510 - Note that ``LEN()``, not ``LENGTH()``, is ANSI SQL; SQL Server only
511 supports ``LEN``.
513 **Example (tested on SQL Server 2014)**
515 .. code-block:: sql
517 DECLARE @source AS VARCHAR(100) = '2001-01-31T21:30:49.123456+07:00';
519 SELECT CAST(
520 SWITCHOFFSET(
521 TODATETIMEOFFSET(
522 CAST(LEFT(@source, LEN(@source) - 6) AS DATETIME2),
523 RIGHT(@source, 6)
524 ),
525 '+00:00'
526 )
527 AS DATETIME2
528 ) -- 2001-01-31 14:30:49.1234560
530 """ # noqa
531 x = fetch_processed_single_clause(element, compiler)
533 date_time_part = f"LEFT({x}, LEN({x}) - {_TZ_LEN})" # a VARCHAR
534 old_timezone = f"RIGHT({x}, {_TZ_LEN})" # a VARCHAR
535 date_time_no_tz = f"CAST({date_time_part} AS DATETIME2)" # a DATETIME2
536 date_time_offset_with_old_tz = (
537 f"TODATETIMEOFFSET({date_time_no_tz}, {old_timezone})"
538 # a DATETIMEOFFSET
539 )
540 date_time_offset_with_utc_tz = (
541 f"SWITCHOFFSET({date_time_offset_with_old_tz}, {_UTC_TZ_LITERAL})"
542 # a DATETIMEOFFSET in UTC
543 )
544 result_utc = f"CAST({date_time_offset_with_utc_tz} AS DATETIME2)"
546 # log.warning(result_utc)
547 return result_utc
550# -----------------------------------------------------------------------------
551# unknown_field_to_utcdatetime
552# -----------------------------------------------------------------------------
554# noinspection PyPep8Naming
555class unknown_field_to_utcdatetime(FunctionElement):
556 """
557 Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`.
559 Creates an SQL expression wrapping a field containing something unknown,
560 which might be a ``DATETIME`` or an ISO-formatted field, and
561 making a ``DATETIME`` out of it, in the UTC timezone.
563 Implemented for different SQL dialects.
564 """
565 type = DateTime()
566 name = 'unknown_field_to_utcdatetime'
569# noinspection PyUnusedLocal
570@compiles(unknown_field_to_utcdatetime)
571def unknown_field_to_utcdatetime_default(
572 element: "ClauseElement",
573 compiler: "SQLCompiler", **kw) -> None:
574 """
575 Default implementation for :class:`unknown_field_to_utcdatetime`: fail.
576 """
577 fail_unknown_dialect(compiler, "perform unknown_field_to_utcdatetime")
580# noinspection PyUnusedLocal
581@compiles(unknown_field_to_utcdatetime, SqlaDialectName.MYSQL)
582def unknown_field_to_utcdatetime_mysql(
583 element: "ClauseElement",
584 compiler: "SQLCompiler", **kw) -> str:
585 """
586 Implementation of :class:`unknown_field_to_utcdatetime` for MySQL.
588 If it's the length of a plain ``DATETIME`` e.g. ``2013-05-30 00:00:00``
589 (19), leave it as a ``DATETIME``; otherwise convert ISO -> ``DATETIME``.
590 """
591 x = fetch_processed_single_clause(element, compiler)
592 converted = isotzdatetime_to_utcdatetime_mysql(element, compiler, **kw)
593 result = f"IF(LENGTH({x}) = {_MYSQL_DATETIME_LEN}, {x}, {converted})"
594 # log.warning(result)
595 return result
598# noinspection PyUnusedLocal
599@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLITE)
600def unknown_field_to_utcdatetime_sqlite(
601 element: "ClauseElement",
602 compiler: "SQLCompiler", **kw) -> str:
603 """
604 Implementation of :class:`unknown_field_to_utcdatetime` for SQLite.
605 """
606 x = fetch_processed_single_clause(element, compiler)
607 fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON))
608 result = f"STRFTIME({fmt}, {x})"
609 # log.warning(result)
610 return result
613# noinspection PyUnusedLocal
614@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLSERVER)
615def unknown_field_to_utcdatetime_sqlserver(
616 element: "ClauseElement",
617 compiler: "SQLCompiler", **kw) -> str:
618 """
619 Implementation of :class:`unknown_field_to_utcdatetime` for SQL Server.
621 We should cope also with the possibility of a ``DATETIME2`` field, not just
622 ``DATETIME``. It seems consistent that ``LEN(DATETIME2) = 27``, with
623 precision tenth of a microsecond, e.g. ``2001-01-31 21:30:49.1234567``
624 (27).
626 So, if it looks like a ``DATETIME`` or a ``DATETIME2``, then we leave it
627 alone; otherwise we put it through our ISO-to-datetime function.
629 Importantly, note that neither ``_SQLSERVER_DATETIME_LEN`` nor
630 ``_SQLSERVER_DATETIME2_LEN`` are the length of any of our ISO strings.
631 """
632 x = fetch_processed_single_clause(element, compiler)
633 # https://stackoverflow.com/questions/5487892/sql-server-case-when-or-then-else-end-the-or-is-not-supported # noqa
634 converted = isotzdatetime_to_utcdatetime_sqlserver(element, compiler, **kw)
635 result = (
636 f"CASE WHEN LEN({x}) IN "
637 f"({_SQLSERVER_DATETIME_LEN}, {_SQLSERVER_DATETIME2_LEN}) THEN {x} "
638 f"ELSE {converted} "
639 f"END"
640 )
641 # log.warning(result)
642 return result
645# =============================================================================
646# Custom date/time field as ISO-8601 text including timezone, using
647# pendulum.DateTime on the Python side.
648# =============================================================================
650class PendulumDateTimeAsIsoTextColType(TypeDecorator):
651 """
652 Stores date/time values as ISO-8601, in a specific format.
653 Uses Pendulum on the Python side.
654 """
656 impl = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN)
657 # ... underlying SQL type
659 _coltype_name = "PendulumDateTimeAsIsoTextColType"
661 @property
662 def python_type(self) -> type:
663 """
664 The Python type of the object.
665 """
666 return Pendulum
668 @staticmethod
669 def pendulum_to_isostring(x: PotentialDatetimeType) -> Optional[str]:
670 """
671 From a Python datetime to an ISO-formatted string in our particular
672 format.
673 """
674 # https://docs.python.org/3.4/library/datetime.html#strftime-strptime-behavior # noqa
675 x = coerce_to_pendulum(x)
676 try:
677 mainpart = x.strftime("%Y-%m-%dT%H:%M:%S.%f") # microsecond accuracy # noqa
678 timezone = x.strftime("%z") # won't have the colon in
679 return mainpart + timezone[:-2] + ":" + timezone[-2:]
680 except AttributeError:
681 return None
683 @staticmethod
684 def isostring_to_pendulum(x: Optional[str]) -> Optional[Pendulum]:
685 """
686 From an ISO-formatted string to a Python Pendulum, with timezone.
687 """
688 try:
689 return coerce_to_pendulum(x)
690 except (ParserError, ValueError):
691 log.warning("Bad ISO date/time string: {!r}", x)
692 return None
694 def process_bind_param(self, value: Optional[Pendulum],
695 dialect: Dialect) -> Optional[str]:
696 """
697 Convert parameters on the way from Python to the database.
698 """
699 retval = self.pendulum_to_isostring(value)
700 if DEBUG_DATETIME_AS_ISO_TEXT:
701 log.warning(
702 "{}.process_bind_param("
703 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
704 self._coltype_name, self, value, dialect, retval)
705 return retval
707 def process_literal_param(self, value: Optional[Pendulum],
708 dialect: Dialect) -> Optional[str]:
709 """
710 Convert literals on the way from Python to the database.
711 """
712 retval = self.pendulum_to_isostring(value)
713 if DEBUG_DATETIME_AS_ISO_TEXT:
714 log.warning(
715 "{}.process_literal_param("
716 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
717 self._coltype_name, self, value, dialect, retval)
718 return retval
720 def process_result_value(self, value: Optional[str],
721 dialect: Dialect) -> Optional[Pendulum]:
722 """
723 Convert things on the way from the database to Python.
724 """
725 retval = self.isostring_to_pendulum(value)
726 if DEBUG_DATETIME_AS_ISO_TEXT:
727 log.warning(
728 "{}.process_result_value("
729 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
730 self._coltype_name, self, value, dialect, retval)
731 return retval
733 # noinspection PyPep8Naming
734 class comparator_factory(TypeDecorator.Comparator):
735 """
736 Process SQL for when we are comparing our column, in the database,
737 to something else.
739 We make this dialect-independent by calling functions like
741 .. code-block:: none
743 unknown_field_to_utcdatetime
744 isotzdatetime_to_utcdatetime
746 ... which we then specialize for specific dialects.
748 This function itself does not appear to be able to access any
749 information about the dialect.
750 """
752 def operate(self, op, *other, **kwargs):
753 assert len(other) == 1
754 assert not kwargs
755 other = other[0]
756 try:
757 processed_other = convert_datetime_to_utc(
758 coerce_to_pendulum(other))
759 # - If you try to call a dialect-specialized FunctionElement,
760 # it processes the clause to "?" (meaning "attach bind
761 # parameter here"); it's not the value itself.
762 # - For our SQLite "milliseconds only" comparator problem (see
763 # above), we can't do very much here without knowing the
764 # dialect. So we make the SQLite side look like it has
765 # microseconds by appending "000"...
766 except (AttributeError, ParserError, TypeError, ValueError):
767 # OK. At this point, "other" could be a plain DATETIME field,
768 # or a PendulumDateTimeAsIsoTextColType field (or potentially
769 # something else that we don't really care about). If it's a
770 # DATETIME, then we assume it is already in UTC.
771 processed_other = unknown_field_to_utcdatetime(other)
772 if DEBUG_DATETIME_AS_ISO_TEXT:
773 log.warning("operate(self={!r}, op={!r}, other={!r})",
774 self, op, other)
775 log.warning("self.expr = {!r}", self.expr)
776 log.warning("processed_other = {!r}", processed_other)
777 # traceback.print_stack()
778 return op(isotzdatetime_to_utcdatetime(self.expr),
779 processed_other)
781 def reverse_operate(self, op, *other, **kwargs):
782 assert False, "I don't think this is ever being called"
785# =============================================================================
786# Custom duration field as ISO-8601 text, using pendulum.Duration on the Python
787# side.
788# =============================================================================
790class PendulumDurationAsIsoTextColType(TypeDecorator):
791 """
792 Stores time durations as ISO-8601, in a specific format.
793 Uses :class:`pendulum.Duration` on the Python side.
794 """
796 impl = String(length=StringLengths.ISO8601_DURATION_STRING_MAX_LEN)
797 # ... underlying SQL type
799 _coltype_name = "PendulumDurationAsIsoTextColType"
801 @property
802 def python_type(self) -> type:
803 """
804 The Python type of the object.
805 """
806 return Duration
808 @staticmethod
809 def pendulum_duration_to_isostring(x: Optional[Duration]) -> Optional[str]:
810 """
811 From a :class:`pendulum.Duration` (or ``None``) an ISO-formatted string
812 in our particular format (or ``NULL``).
813 """
814 if x is None:
815 return None
816 return duration_to_iso(x, permit_years_months=True,
817 minus_sign_at_front=True)
819 @staticmethod
820 def isostring_to_pendulum_duration(x: Optional[str]) -> Optional[Duration]:
821 """
822 From an ISO-formatted string to a Python Pendulum, with timezone.
823 """
824 if not x: # None (NULL) or blank string
825 return None
826 try:
827 return duration_from_iso(x)
828 except (ISO8601Error, ValueError):
829 log.warning("Bad ISO duration string: {!r}", x)
830 return None
832 def process_bind_param(self, value: Optional[Pendulum],
833 dialect: Dialect) -> Optional[str]:
834 """
835 Convert parameters on the way from Python to the database.
836 """
837 retval = self.pendulum_duration_to_isostring(value)
838 if DEBUG_DURATION_AS_ISO_TEXT:
839 log.warning(
840 "{}.process_bind_param("
841 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
842 self._coltype_name, self, value, dialect, retval)
843 return retval
845 def process_literal_param(self, value: Optional[Pendulum],
846 dialect: Dialect) -> Optional[str]:
847 """
848 Convert literals on the way from Python to the database.
849 """
850 retval = self.pendulum_duration_to_isostring(value)
851 if DEBUG_DURATION_AS_ISO_TEXT:
852 log.warning(
853 "{}.process_literal_param("
854 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
855 self._coltype_name, self, value, dialect, retval)
856 return retval
858 def process_result_value(self, value: Optional[str],
859 dialect: Dialect) -> Optional[Pendulum]:
860 """
861 Convert things on the way from the database to Python.
862 """
863 retval = self.isostring_to_pendulum_duration(value)
864 if DEBUG_DURATION_AS_ISO_TEXT:
865 log.warning(
866 "{}.process_result_value("
867 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
868 self._coltype_name, self, value, dialect, retval)
869 return retval
871 # No comparator_factory; we do not use SQL to compare ISO durations.
874# =============================================================================
875# Semantic version column type
876# =============================================================================
878class SemanticVersionColType(TypeDecorator):
879 """
880 Stores semantic versions in the database.
881 Uses :class:`semantic_version.Version` on the Python side.
882 """
884 impl = String(length=147) # https://github.com/mojombo/semver/issues/79
886 _coltype_name = "SemanticVersionColType"
888 @property
889 def python_type(self) -> type:
890 """
891 The Python type of the object.
892 """
893 return Version
895 def process_bind_param(self, value: Optional[Version],
896 dialect: Dialect) -> Optional[str]:
897 """
898 Convert parameters on the way from Python to the database.
899 """
900 retval = str(value) if value is not None else None
901 if DEBUG_SEMANTIC_VERSION:
902 log.warning(
903 "{}.process_bind_param("
904 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
905 self._coltype_name, self, value, dialect, retval)
906 return retval
908 def process_literal_param(self, value: Optional[Version],
909 dialect: Dialect) -> Optional[str]:
910 """
911 Convert literals on the way from Python to the database.
912 """
913 retval = str(value) if value is not None else None
914 if DEBUG_SEMANTIC_VERSION:
915 log.warning(
916 "{}.process_literal_param("
917 "self={!r}, value={!r}, dialect={!r}) -> !r",
918 self._coltype_name, self, value, dialect, retval)
919 return retval
921 def process_result_value(self, value: Optional[str],
922 dialect: Dialect) -> Optional[Version]:
923 """
924 Convert things on the way from the database to Python.
925 """
926 if value is None:
927 retval = None
928 else:
929 # Here we do some slightly fancier conversion to deal with all
930 # sorts of potential rubbish coming in, so we get a properly
931 # ordered Version out:
932 retval = make_version(value)
933 if DEBUG_SEMANTIC_VERSION:
934 log.warning(
935 "{}.process_result_value("
936 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
937 self._coltype_name, self, value, dialect, retval)
938 return retval
940 '''
941 # noinspection PyPep8Naming
942 class comparator_factory(TypeDecorator.Comparator):
943 """
944 Process SQL for when we are comparing our column, in the database,
945 to something else.
947 See https://docs.sqlalchemy.org/en/13/core/type_api.html#sqlalchemy.types.TypeEngine.comparator_factory.
949 .. warning::
951 I'm not sure this is either (a) correct or (b) used; it may
952 produce a string comparison of e.g. ``14.0.0`` versus ``2.0.0``,
953 which will be alphabetical and therefore wrong.
954 Disabled on 2019-04-28.
956 """ # noqa
958 def operate(self, op, *other, **kwargs):
959 assert len(other) == 1
960 assert not kwargs
961 other = other[0]
962 if isinstance(other, Version):
963 processed_other = str(Version)
964 else:
965 processed_other = other
966 return op(self.expr, processed_other)
968 def reverse_operate(self, op, *other, **kwargs):
969 assert False, "I don't think this is ever being called"
970 '''
973# =============================================================================
974# IdNumReferenceListColType
975# =============================================================================
977class IdNumReferenceListColType(TypeDecorator):
978 """
979 Stores a list of IdNumReference objects.
980 On the database side, uses a comma-separated list of integers.
981 """
983 impl = Text()
984 _coltype_name = "IdNumReferenceListColType"
986 @property
987 def python_type(self) -> type:
988 """
989 The Python type of the object.
990 """
991 return list
993 @staticmethod
994 def _idnumdef_list_to_dbstr(
995 idnumdef_list: Optional[List[IdNumReference]]) -> str:
996 """
997 Converts an optional list of
998 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`
999 objects to a CSV string suitable for storing in the database.
1000 """
1001 if not idnumdef_list:
1002 return ""
1003 elements = [] # type: List[int]
1004 for idnumdef in idnumdef_list:
1005 elements.append(idnumdef.which_idnum)
1006 elements.append(idnumdef.idnum_value)
1007 return ",".join(str(x) for x in elements)
1009 @staticmethod
1010 def _dbstr_to_idnumdef_list(dbstr: Optional[str]) -> List[IdNumReference]:
1011 """
1012 Converts a CSV string (from the database) to a list of
1013 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`
1014 objects.
1015 """
1016 idnumdef_list = [] # type: List[IdNumReference]
1017 try:
1018 intlist = [int(numstr) for numstr in dbstr.split(",")]
1019 except (AttributeError, TypeError, ValueError):
1020 return []
1021 length = len(intlist)
1022 if length == 0 or length % 2 != 0: # enforce pairs
1023 return []
1024 for which_idnum, idnum_value in chunks(intlist, n=2):
1025 if which_idnum < 0 or idnum_value < 0: # enforce positive integers
1026 return []
1027 idnumdef_list.append(IdNumReference(which_idnum=which_idnum,
1028 idnum_value=idnum_value))
1029 return idnumdef_list
1031 def process_bind_param(self, value: Optional[List[IdNumReference]],
1032 dialect: Dialect) -> str:
1033 """
1034 Convert parameters on the way from Python to the database.
1035 """
1036 retval = self._idnumdef_list_to_dbstr(value)
1037 if DEBUG_IDNUMDEF_LIST:
1038 log.warning(
1039 "{}.process_bind_param("
1040 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
1041 self._coltype_name, self, value, dialect, retval)
1042 return retval
1044 def process_literal_param(self, value: Optional[List[IdNumReference]],
1045 dialect: Dialect) -> str:
1046 """
1047 Convert literals on the way from Python to the database.
1048 """
1049 retval = self._idnumdef_list_to_dbstr(value)
1050 if DEBUG_IDNUMDEF_LIST:
1051 log.warning(
1052 "{}.process_literal_param("
1053 "self={!r}, value={!r}, dialect={!r}) -> !r",
1054 self._coltype_name, self, value, dialect, retval)
1055 return retval
1057 def process_result_value(self, value: Optional[str],
1058 dialect: Dialect) -> List[IdNumReference]:
1059 """
1060 Convert things on the way from the database to Python.
1061 """
1062 retval = self._dbstr_to_idnumdef_list(value)
1063 if DEBUG_IDNUMDEF_LIST:
1064 log.warning(
1065 "{}.process_result_value("
1066 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
1067 self._coltype_name, self, value, dialect, retval)
1068 return retval
1071# =============================================================================
1072# UUID column type
1073# =============================================================================
1075class UuidColType(TypeDecorator):
1076 # Based on:
1077 # https://docs.sqlalchemy.org/en/13/core/custom_types.html#backend-agnostic-guid-type # noqa: E501
1078 # which will use postgresql UUID if relevant, not doing that here
1080 impl = CHAR(32)
1082 @property
1083 def python_type(self) -> type:
1084 return str
1086 def process_bind_param(self, value: uuid.UUID,
1087 dialect: Dialect) -> Optional[str]:
1088 if value is None:
1089 return None
1091 return "%.32x" % value.int
1093 def process_result_value(self, value: Optional[str],
1094 dialect: Dialect) -> Optional[uuid.UUID]:
1095 if value is None:
1096 return None
1098 return uuid.UUID(value)
1101# =============================================================================
1102# JSON column type
1103# =============================================================================
1105class JsonColType(TypeDecorator):
1106 # Unlike
1107 # https://docs.sqlalchemy.org/en/13/core/type_basics.html#sqlalchemy.types.JSON
1108 # does not use vendor-specific JSON type
1109 impl = UnicodeText
1111 @property
1112 def python_type(self) -> type:
1113 return str
1115 def process_bind_param(self, value: Any,
1116 dialect: Dialect) -> Optional[str]:
1117 if value is None:
1118 return None
1120 return json.dumps(value)
1122 def process_result_value(self, value: str, dialect: Dialect) -> Any:
1123 if value is None:
1124 return None
1126 return json.loads(value)
1129# =============================================================================
1130# PermittedValueChecker: used by CamcopsColumn
1131# =============================================================================
1133class PermittedValueChecker(object):
1134 """
1135 Represents permitted values (in columns belonging to CamCOPS tasks), and
1136 checks a value against them.
1137 """
1138 def __init__(self,
1139 not_null: bool = False,
1140 minimum: Union[int, float] = None,
1141 maximum: Union[int, float] = None,
1142 permitted_values: List[Any] = None) -> None:
1143 """
1144 Args:
1145 not_null: must the value not be NULL?
1146 minimum: if specified, a numeric minimum value
1147 maximum: if specified, a numeric maximum value
1148 permitted_values: if specified, a list of permitted values
1149 """
1150 self.not_null = not_null
1151 self.minimum = minimum
1152 self.maximum = maximum
1153 self.permitted_values = permitted_values
1155 def is_ok(self, value: Any) -> bool:
1156 """
1157 Does the value pass our tests?
1158 """
1159 if value is None:
1160 return not self.not_null
1161 # If not_null is True, then the value is not OK; return False.
1162 # If not_null is False, then a null value passes all other tests.
1163 if self.permitted_values is not None and value not in self.permitted_values: # noqa
1164 return False
1165 if self.minimum is not None and value < self.minimum:
1166 return False
1167 if self.maximum is not None and value > self.maximum:
1168 return False
1169 return True
1171 def failure_msg(self, value: Any) -> str:
1172 """
1173 Why does the value not pass our tests?
1174 """
1175 if value is None:
1176 if self.not_null:
1177 return "value is None and NULL values are not permitted"
1178 else:
1179 return "" # value is OK
1180 if self.permitted_values is not None and value not in self.permitted_values: # noqa
1181 return (
1182 f"value {value!r} not in permitted values "
1183 f"{self.permitted_values!r}"
1184 )
1185 if self.minimum is not None and value < self.minimum:
1186 return f"value {value!r} less than minimum of {self.minimum!r}"
1187 if self.maximum is not None and value > self.maximum:
1188 return f"value {value!r} more than maximum of {self.maximum!r}"
1189 return ""
1191 def __repr__(self):
1192 return auto_repr(self)
1194 def permitted_values_csv(self) -> str:
1195 """
1196 Returns a CSV representation of the permitted values.
1198 Primarily used for CRIS data dictionaries.
1199 """
1200 if self.permitted_values:
1201 return ",".join(str(x) for x in self.permitted_values)
1202 # Take a punt that integer minima/maxima mean that only integers are
1203 # permitted...
1204 if isinstance(self.minimum, int) and isinstance(self.maximum, int):
1205 return ",".join(
1206 str(x) for x in range(self.minimum, self.maximum + 1))
1207 return ""
1210# Specific instances, to reduce object duplication and magic numbers:
1212MIN_ZERO_CHECKER = PermittedValueChecker(minimum=0)
1214BIT_CHECKER = PermittedValueChecker(permitted_values=PV.BIT)
1215ZERO_TO_ONE_CHECKER = PermittedValueChecker(minimum=0, maximum=1)
1216ZERO_TO_TWO_CHECKER = PermittedValueChecker(minimum=0, maximum=2)
1217ZERO_TO_THREE_CHECKER = PermittedValueChecker(minimum=0, maximum=3)
1218ZERO_TO_FOUR_CHECKER = PermittedValueChecker(minimum=0, maximum=4)
1219ZERO_TO_FIVE_CHECKER = PermittedValueChecker(minimum=0, maximum=5)
1220ZERO_TO_SIX_CHECKER = PermittedValueChecker(minimum=0, maximum=6)
1221ZERO_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=0, maximum=7)
1222ZERO_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=0, maximum=8)
1223ZERO_TO_NINE_CHECKER = PermittedValueChecker(minimum=0, maximum=9)
1224ZERO_TO_10_CHECKER = PermittedValueChecker(minimum=0, maximum=10)
1225ZERO_TO_100_CHECKER = PermittedValueChecker(minimum=0, maximum=100)
1227ONE_TO_TWO_CHECKER = PermittedValueChecker(minimum=1, maximum=2)
1228ONE_TO_THREE_CHECKER = PermittedValueChecker(minimum=1, maximum=3)
1229ONE_TO_FOUR_CHECKER = PermittedValueChecker(minimum=1, maximum=4)
1230ONE_TO_FIVE_CHECKER = PermittedValueChecker(minimum=1, maximum=5)
1231ONE_TO_SIX_CHECKER = PermittedValueChecker(minimum=1, maximum=6)
1232ONE_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=1, maximum=7)
1233ONE_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=1, maximum=8)
1234ONE_TO_NINE_CHECKER = PermittedValueChecker(minimum=1, maximum=9)
1237# =============================================================================
1238# CamcopsColumn: provides extra functions over Column.
1239# =============================================================================
1241# noinspection PyAbstractClass
1242class CamcopsColumn(Column):
1243 """
1244 A SQLAlchemy :class:`Column` class that supports some CamCOPS-specific
1245 flags, such as:
1247 - whether a field is a BLOB reference;
1248 - how it should be treated for anonymisation;
1249 - which values are permitted in the field (in a soft sense: duff values
1250 cause errors to be reported, but they're still stored).
1251 """
1252 def __init__(self,
1253 *args,
1254 include_in_anon_staging_db: bool = False,
1255 exempt_from_anonymisation: bool = False,
1256 identifies_patient: bool = False,
1257 is_blob_id_field: bool = False,
1258 blob_relationship_attr_name: str = "",
1259 permitted_value_checker: PermittedValueChecker = None,
1260 **kwargs) -> None:
1261 """
1263 Args:
1264 *args:
1265 Arguments to the :class:`Column` constructor.
1266 include_in_anon_staging_db:
1267 Ensure this is marked for inclusion in data dictionaries for an
1268 anonymisation staging database.
1269 exempt_from_anonymisation:
1270 If true: though this field might be text, it is guaranteed not
1271 to contain identifiers (e.g. it might contain only predefined
1272 disease severity descriptions) and does not require
1273 anonymisation.
1274 identifies_patient:
1275 If true: contains a patient identifier (e.g. name).
1276 is_blob_id_field:
1277 If true: this field contains a reference (client FK) to the
1278 BLOB table.
1279 blob_relationship_attr_name:
1280 For BLOB ID fields: the name of the associated relationship
1281 attribute (which, when accessed, yields the BLOB itself) in
1282 the owning class/object.
1283 permitted_value_checker:
1284 If specified, a :class:`PermittedValueChecker` that allows
1285 soft constraints to be specified on the field's contents. (That
1286 is, no constraints are specified at the database level, but we
1287 can moan if incorrect data are present.)
1288 **kwargs:
1289 Arguments to the :class:`Column` constructor.
1290 """
1291 self.include_in_anon_staging_db = include_in_anon_staging_db
1292 self.exempt_from_anonymisation = exempt_from_anonymisation
1293 self.identifies_patient = identifies_patient
1294 self.is_blob_id_field = is_blob_id_field
1295 self.blob_relationship_attr_name = blob_relationship_attr_name
1296 self.permitted_value_checker = permitted_value_checker
1297 if is_blob_id_field:
1298 assert blob_relationship_attr_name, (
1299 "If specifying a BLOB ID field, must give the attribute name "
1300 "of the relationship too")
1301 super().__init__(*args, **kwargs)
1303 def _constructor(self, *args, **kwargs) -> "CamcopsColumn":
1304 """
1305 SQLAlchemy method (not clearly documented) to assist in copying
1306 objects. Returns a copy of this object.
1308 See
1309 https://bitbucket.org/zzzeek/sqlalchemy/issues/2284/please-make-column-easier-to-subclass
1310 """ # noqa
1311 kwargs['include_in_anon_staging_db'] = self.include_in_anon_staging_db
1312 kwargs['exempt_from_anonymisation'] = self.exempt_from_anonymisation
1313 kwargs['identifies_patient'] = self.identifies_patient
1314 kwargs['is_blob_id_field'] = self.is_blob_id_field
1315 kwargs['blob_relationship_attr_name'] = self.blob_relationship_attr_name # noqa
1316 kwargs['permitted_value_checker'] = self.permitted_value_checker
1317 # noinspection PyTypeChecker
1318 return self.__class__(*args, **kwargs)
1320 def __repr__(self) -> str:
1321 def kvp(attrname: str) -> str:
1322 return f"{attrname}={getattr(self, attrname)!r}"
1323 elements = [
1324 kvp("include_in_anon_staging_db"),
1325 kvp("exempt_from_anonymisation"),
1326 kvp("identifies_patient"),
1327 kvp("is_blob_id_field"),
1328 kvp("blob_relationship_attr_name"),
1329 kvp("permitted_value_checker"),
1330 f"super()={super().__repr__()}",
1331 ]
1332 return f"CamcopsColumn({', '.join(elements)})"
1334 def set_permitted_value_checker(
1335 self, permitted_value_checker: PermittedValueChecker) -> None:
1336 """
1337 Sets the :class:`PermittedValueChecker` attribute.
1338 """
1339 self.permitted_value_checker = permitted_value_checker
1342# =============================================================================
1343# Operate on Column/CamcopsColumn properties
1344# =============================================================================
1346def gen_columns_matching_attrnames(obj, attrnames: List[str]) \
1347 -> Generator[Tuple[str, Column], None, None]:
1348 """
1349 Find columns of an SQLAlchemy ORM object whose attribute names match a
1350 list.
1352 Args:
1353 obj: SQLAlchemy ORM object to inspect
1354 attrnames: attribute names
1356 Yields:
1357 ``attrname, column`` tuples
1359 """
1360 for attrname, column in gen_columns(obj):
1361 if attrname in attrnames:
1362 yield attrname, column
1365def gen_camcops_columns(obj) -> Generator[Tuple[str, CamcopsColumn],
1366 None, None]:
1367 """
1368 Finds all columns of an object that are
1369 :class:`camcops_server.cc_modules.cc_sqla_coltypes.CamcopsColumn` columns.
1371 Args:
1372 obj: SQLAlchemy ORM object to inspect
1374 Yields:
1375 ``attrname, column`` tuples
1376 """
1377 for attrname, column in gen_columns(obj):
1378 if isinstance(column, CamcopsColumn):
1379 yield attrname, column
1382def gen_camcops_blob_columns(obj) -> Generator[Tuple[str, CamcopsColumn],
1383 None, None]:
1384 """
1385 Finds all columns of an object that are
1386 :class:`camcops_server.cc_modules.cc_sqla_coltypes.CamcopsColumn` columns
1387 referencing the BLOB table.
1389 Args:
1390 obj: SQLAlchemy ORM object to inspect
1392 Yields:
1393 ``attrname, column`` tuples
1394 """
1395 for attrname, column in gen_camcops_columns(obj):
1396 if column.is_blob_id_field:
1397 if attrname != column.name:
1398 log.warning("BLOB field where attribute name {!r} != SQL "
1399 "column name {!r}", attrname, column.name)
1400 yield attrname, column
1403def get_column_attr_names(obj) -> List[str]:
1404 """
1405 Get a list of column attribute names from an SQLAlchemy ORM object.
1406 """
1407 return [attrname for attrname, _ in gen_columns(obj)]
1410def get_camcops_column_attr_names(obj) -> List[str]:
1411 """
1412 Get a list of
1413 :class:`camcops_server.cc_modules.cc_sqla_coltypes.CamcopsColumn` column
1414 attribute names from an SQLAlchemy ORM object.
1415 """
1416 return [attrname for attrname, _ in gen_camcops_columns(obj)]
1419def get_camcops_blob_column_attr_names(obj) -> List[str]:
1420 """
1421 Get a list of
1422 :class:`camcops_server.cc_modules.cc_sqla_coltypes.CamcopsColumn` BLOB
1423 column attribute names from an SQLAlchemy ORM object.
1424 """
1425 return [attrname for attrname, _ in gen_camcops_blob_columns(obj)]
1428def permitted_value_failure_msgs(obj) -> List[str]:
1429 """
1430 Checks a SQLAlchemy ORM object instance against its permitted value checks
1431 (via its :class:`camcops_server.cc_modules.cc_sqla_coltypes.CamcopsColumn`
1432 columns), if it has any.
1434 Returns a list of failure messages (empty list means all OK).
1436 If you just want to know whether it passes, a quicker way is via
1437 :func:`permitted_values_ok`.
1438 """
1439 failure_msgs = []
1440 for attrname, camcops_column in gen_camcops_columns(obj):
1441 pv_checker = camcops_column.permitted_value_checker # type: Optional[PermittedValueChecker] # noqa
1442 if pv_checker is None:
1443 continue
1444 value = getattr(obj, attrname)
1445 failure_msg = pv_checker.failure_msg(value)
1446 if failure_msg:
1447 failure_msgs.append(
1448 f"Invalid value for {attrname}: {failure_msg}")
1449 return failure_msgs
1452def permitted_values_ok(obj) -> bool:
1453 """
1454 Checks whether an instance passes its permitted value checks, if it has
1455 any.
1457 If you want to know why it failed, see
1458 :func:`permitted_value_failure_msgs`.
1459 """
1460 for attrname, camcops_column in gen_camcops_columns(obj):
1461 pv_checker = camcops_column.permitted_value_checker # type: Optional[PermittedValueChecker] # noqa
1462 if pv_checker is None:
1463 continue
1464 value = getattr(obj, attrname)
1465 if not pv_checker.is_ok(value):
1466 return False
1467 return True
1470def gen_ancillary_relationships(obj) -> Generator[
1471 Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]],
1472 None, None]:
1473 """
1474 For an SQLAlchemy ORM object, yields tuples of ``attrname,
1475 relationship_property, related_class`` for all relationships that are
1476 marked as a CamCOPS ancillary relationship.
1477 """
1478 for attrname, rel_prop, related_class in gen_relationships(obj):
1479 if rel_prop.info.get(RelationshipInfo.IS_ANCILLARY, None) is True:
1480 yield attrname, rel_prop, related_class
1483def gen_blob_relationships(obj) -> Generator[
1484 Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]],
1485 None, None]:
1486 """
1487 For an SQLAlchemy ORM object, yields tuples of ``attrname,
1488 relationship_property, related_class`` for all relationships that are
1489 marked as a CamCOPS BLOB relationship.
1490 """
1491 for attrname, rel_prop, related_class in gen_relationships(obj):
1492 if rel_prop.info.get(RelationshipInfo.IS_BLOB, None) is True:
1493 yield attrname, rel_prop, related_class
1496# =============================================================================
1497# Specializations of CamcopsColumn to save typing
1498# =============================================================================
1500def _name_type_in_column_args(args: Tuple[Any, ...]) -> Tuple[bool, bool]:
1501 """
1502 SQLAlchemy doesn't encourage deriving from Column. If you do, you have to
1503 implement ``__init__()`` and ``_constructor()`` carefully. The
1504 ``__init__()`` function will be called by user code, and via SQLAlchemy
1505 internals, including via ``_constructor`` (e.g. from
1506 ``Column.make_proxy()``).
1508 It is likely that ``__init__`` will experience many combinations of the
1509 column name and type being passed either in ``*args`` or ``**kwargs``. It
1510 must pass them on to :class:`Column`. If you don't mess with the type,
1511 that's easy; just pass them on unmodified. But if you plan to mess with the
1512 type, as we do in :class:`BoolColumn` below, we must make sure that we
1513 don't pass either of ``name`` or ``type_`` in *both* ``args`` and
1514 ``kwargs``.
1516 This function tells you whether ``name`` and ``type_`` are present in args,
1517 using the same method as ``Column.__init__()``.
1518 """
1519 name_in_args = False
1520 type_in_args = False
1521 args = list(args) # make a copy, and make it a list not a tuple
1522 if args:
1523 if isinstance(args[0], util.string_types):
1524 name_in_args = True
1525 args.pop(0)
1526 if args:
1527 coltype = args[0]
1528 if hasattr(coltype, "_sqla_type"):
1529 type_in_args = True
1530 return name_in_args, type_in_args
1533# noinspection PyAbstractClass
1534class BoolColumn(CamcopsColumn):
1535 """
1536 A :class:`camcops_server.cc_modules.cc_sqla_coltypes.CamcopsColumn`
1537 representing a boolean value.
1538 """
1539 def __init__(self, *args: Any, **kwargs: Any) -> None:
1540 # Must pass on all arguments, ultimately to Column, or when using
1541 # AbstractConcreteBase, you can get this:
1542 #
1543 # TypeError: Could not create a copy of this <class 'camcops_server.
1544 # cc_modules.cc_sqla_coltypes.BoolColumn'> object. Ensure the class
1545 # includes a _constructor() attribute or method which accepts the
1546 # standard Column constructor arguments, or references the Column class
1547 # itself.
1548 #
1549 # During internal copying, "type_" can arrive here within kwargs, so
1550 # we must make sure that we don't send it on twice to super().__init().
1551 # Also, Column.make_proxy() calls our _constructor() with name and type
1552 # in args, so we must handle that, too...
1554 _, type_in_args = _name_type_in_column_args(args)
1555 self.constraint_name = kwargs.pop("constraint_name", None) # type: Optional[str] # noqa
1556 if not type_in_args:
1557 if self.constraint_name:
1558 constraint_name_conv = conv(self.constraint_name)
1559 # ... see help for ``conv``
1560 else:
1561 constraint_name_conv = None
1562 kwargs['type_'] = Boolean(name=constraint_name_conv)
1563 # The "name" parameter to Boolean() specifies the name of the
1564 # (0, 1) constraint.
1565 kwargs['permitted_value_checker'] = BIT_CHECKER
1566 super().__init__(*args, **kwargs)
1567 if (not self.constraint_name and
1568 len(self.name) >= LONG_COLUMN_NAME_WARNING_LIMIT):
1569 log.warning(
1570 "BoolColumn with long column name and no constraint name: "
1571 "{!r}", self.name
1572 )
1574 def __repr__(self) -> str:
1575 def kvp(attrname: str) -> str:
1576 return f"{attrname}={getattr(self, attrname)!r}"
1577 elements = [
1578 kvp("constraint_name"),
1579 f"super()={super().__repr__()}",
1580 ]
1581 return f"BoolColumn({', '.join(elements)})"
1583 def _constructor(self, *args: Any, **kwargs: Any) -> "BoolColumn":
1584 """
1585 Make a copy; see
1586 https://bitbucket.org/zzzeek/sqlalchemy/issues/2284/please-make-column-easier-to-subclass
1587 """
1588 kwargs["constraint_name"] = self.constraint_name
1589 return super()._constructor(*args, **kwargs)