Coverage for cc_modules/cc_sqla_coltypes.py: 66%
491 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
1"""
2camcops_server/cc_modules/cc_sqla_coltypes.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**SQLAlchemy column types used by CamCOPS.**
28Note these built-in SQLAlchemy types
29(https://docs.sqlalchemy.org/en/latest/core/type_basics.html#generic-types):
31 =============== ===========================================================
32 SQLAlchemy type Comment
33 =============== ===========================================================
34 BigInteger MySQL: -9,223,372,036,854,775,808 to
35 9,223,372,036,854,775,807 (64-bit)
36 (compare NHS number: up to 9,999,999,999)
37 Boolean
38 Date
39 DateTime
40 Enum
41 Float
42 Integer MySQL: -2,147,483,648 to 2,147,483,647 (32-bit)
43 Interval For ``datetime.timedelta``
44 LargeBinary Under MySQL, maps to ``BLOB``
45 MatchType For the return type of the ``MATCH`` operator
46 Numeric For fixed-precision numbers like ``NUMERIC`` or ``DECIMAL``
47 PickleType
48 SchemaType
49 SmallInteger
50 String ``VARCHAR``
51 Text Variably sized string type.
52 (Under MySQL, renders as ``TEXT``.)
53 Time
54 Unicode Implies that the underlying column explicitly supports
55 Unicode
56 UnicodeText Variably sized version of Unicode
57 (Under MySQL, renders as ``TEXT`` too.)
58 =============== ===========================================================
60Not supported across all platforms:
62 =============== ===========================================================
63 SQL type Comment
64 =============== ===========================================================
65 BIGINT UNSIGNED MySQL: 0 to 18,446,744,073,709,551,615 (64-bit).
66 Use ``sqlalchemy.dialects.mysql.BIGINT(unsigned=True)``.
67 INT UNSIGNED MySQL: 0 to 4,294,967,295 (32-bit).
68 Use ``sqlalchemy.dialects.mysql.INTEGER(unsigned=True)``.
69 =============== ===========================================================
71Other MySQL sizes:
73 =============== ===========================================================
74 MySQL type Comment
75 =============== ===========================================================
76 TINYBLOB 2^8 bytes = 256 bytes
77 BLOB 2^16 bytes = 64 KiB
78 MEDIUMBLOB 2^24 bytes = 16 MiB
79 LONGBLOB 2^32 bytes = 4 GiB
80 TINYTEXT 255 (2^8 - 1) bytes
81 TEXT 65,535 bytes (2^16 - 1) = 64 KiB
82 MEDIUMTEXT 16,777,215 (2^24 - 1) bytes = 16 MiB
83 LONGTEXT 4,294,967,295 (2^32 - 1) bytes = 4 GiB
84 =============== ===========================================================
86See https://stackoverflow.com/questions/13932750/tinytext-text-mediumtext-and-longtext-maximum-storage-sizes.
88Also notes:
90- Columns may need their character set specified explicitly under MySQL:
91 https://stackoverflow.com/questions/2108824/mysql-incorrect-string-value-error-when-save-unicode-string-in-django
93""" # noqa
95# =============================================================================
96# Imports
97# =============================================================================
99import json
100import logging
101from typing import (
102 Any,
103 Generator,
104 List,
105 NoReturn,
106 Optional,
107 Sequence,
108 Tuple,
109 Type,
110 TYPE_CHECKING,
111 Union,
112)
113import uuid
115from cardinal_pythonlib.datetimefunc import (
116 coerce_to_pendulum,
117 convert_datetime_to_utc,
118 duration_from_iso,
119 duration_to_iso,
120 PotentialDatetimeType,
121)
122from cardinal_pythonlib.lists import chunks
123from cardinal_pythonlib.logs import BraceStyleAdapter
124from cardinal_pythonlib.reprfunc import auto_repr
125from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
126from cardinal_pythonlib.sqlalchemy.orm_inspect import (
127 gen_columns,
128 gen_relationships,
129)
130from cardinal_pythonlib.sqlalchemy.sqlfunc import (
131 fail_unknown_dialect,
132 fetch_processed_single_clause,
133)
134from isodate.isoerror import ISO8601Error
135from pendulum import DateTime as Pendulum, Duration
136from pendulum.parsing.exceptions import ParserError
137import phonenumbers
138from semantic_version import Version
139from sqlalchemy.dialects import mysql
140from sqlalchemy.engine.interfaces import Dialect
141from sqlalchemy.ext.compiler import compiles
142from sqlalchemy.orm import mapped_column, MappedColumn
143from sqlalchemy.orm.relationships import RelationshipProperty
144from sqlalchemy.sql.elements import conv
145from sqlalchemy.sql.expression import text
146from sqlalchemy.sql.functions import FunctionElement
147from sqlalchemy.sql.schema import Column
148from sqlalchemy.sql.sqltypes import (
149 Boolean,
150 CHAR,
151 DateTime,
152 LargeBinary,
153 String,
154 Text,
155 Unicode,
156 UnicodeText,
157)
158from sqlalchemy.sql.type_api import TypeDecorator
160from camcops_server.cc_modules.cc_constants import PV, StringLengths
161from camcops_server.cc_modules.cc_simpleobjects import IdNumReference
162from camcops_server.cc_modules.cc_sqlalchemy import (
163 LONG_COLUMN_NAME_WARNING_LIMIT,
164)
165from camcops_server.cc_modules.cc_version import make_version
167if TYPE_CHECKING:
168 from sqlalchemy.sql.elements import ClauseElement
169 from sqlalchemy.sql.compiler import SQLCompiler
170 from sqlalchemy.sql.type_api import _CT, ColumnElement, OperatorType
171 from camcops_server.cc_modules.cc_db import (
172 GenericTabletRecordMixin,
173 )
175log = BraceStyleAdapter(logging.getLogger(__name__))
178# =============================================================================
179# Debugging options
180# =============================================================================
182DEBUG_DATETIME_AS_ISO_TEXT = False
183DEBUG_DURATION_AS_ISO_TEXT = False
184DEBUG_IDNUMDEF_LIST = False
185DEBUG_INT_LIST_COLTYPE = False
186DEBUG_SEMANTIC_VERSION = False
187DEBUG_STRING_LIST_COLTYPE = False
189if any(
190 [
191 DEBUG_DATETIME_AS_ISO_TEXT,
192 DEBUG_DURATION_AS_ISO_TEXT,
193 DEBUG_SEMANTIC_VERSION,
194 DEBUG_IDNUMDEF_LIST,
195 DEBUG_INT_LIST_COLTYPE,
196 DEBUG_STRING_LIST_COLTYPE,
197 ]
198):
199 log.warning("Debugging options enabled!")
202# =============================================================================
203# Constants
204# =============================================================================
207class RelationshipInfo(object):
208 """
209 Used as keys the ``info`` (user-defined) dictionary parameter to SQLAlchemy
210 ``relationship`` calls; see
211 https://docs.sqlalchemy.org/en/latest/orm/relationship_api.html#sqlalchemy.orm.relationship.
212 """
214 IS_ANCILLARY = "is_ancillary"
215 IS_BLOB = "is_blob"
218# =============================================================================
219# Simple derivative column types
220# =============================================================================
221# If you insert something too long into a VARCHAR, it just gets truncated.
223AuditSourceColType = String(length=StringLengths.AUDIT_SOURCE_MAX_LEN)
225# BigIntUnsigned = Integer().with_variant(mysql.BIGINT(unsigned=True), 'mysql')
226# ... partly because Alembic breaks on variants (Aug 2017), and partly because
227# it's nonstandard and unnecessary, changed all BigIntUnsigned to
228# BigInteger (2017-08-25).
230Base32ColType = String(length=StringLengths.BASE32_MAX_LEN)
232CharColType = String(length=1)
233CharsetColType = String(length=StringLengths.CHARSET_MAX_LEN)
234CurrencyColType = Unicode(length=StringLengths.CURRENCY_MAX_LEN)
236DatabaseTitleColType = Unicode(length=StringLengths.DATABASE_TITLE_MAX_LEN)
237DeviceNameColType = String(length=StringLengths.DEVICE_NAME_MAX_LEN)
238DiagnosticCodeColType = String(length=StringLengths.DIAGNOSTIC_CODE_MAX_LEN)
240EmailAddressColType = Unicode(length=StringLengths.EMAIL_ADDRESS_MAX_LEN)
241EraColType = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN)
242ExportRecipientNameColType = String(
243 length=StringLengths.EXPORT_RECIPIENT_NAME_MAX_LEN
244)
245ExportTransmissionMethodColType = String(
246 length=StringLengths.SENDING_FORMAT_MAX_LEN
247)
249FilterTextColType = Unicode(length=StringLengths.FILTER_TEXT_MAX_LEN)
250FileSpecColType = Unicode(length=StringLengths.FILESPEC_MAX_LEN)
251FullNameColType = Unicode(length=StringLengths.FULLNAME_MAX_LEN)
253GroupDescriptionColType = Unicode(
254 length=StringLengths.GROUP_DESCRIPTION_MAX_LEN
255)
256GroupNameColType = Unicode(length=StringLengths.GROUP_NAME_MAX_LEN)
258HashedPasswordColType = String(length=StringLengths.HASHED_PW_MAX_LEN)
259# ... You might think that we must ensure case-SENSITIVE comparison on this
260# field. That would require the option collation='utf8mb4_bin' to String(),
261# for MySQL. However, that is MySQL-specific, and SQLAlchemy currently (Oct
262# 2017) doesn't support database-specific *per-column* collations. SQLite
263# accepts COLLATE commands but chokes on 'utf8mb4_bin'. Now, the hashed
264# password from bcrypt() is case-sensitive. HOWEVER, the important thing is
265# that we always retrieve the string from the database and do a case-sensitive
266# comparison in Python (see calls to is_password_valid()). So the database
267# collation doesn't matter. So we don't set it.
268# See further notes in cc_sqlalchemy.py
269HL7AssigningAuthorityType = String(length=StringLengths.HL7_AA_MAX_LEN)
270HL7IdTypeType = String(length=StringLengths.HL7_ID_TYPE_MAX_LEN)
271HostnameColType = String(length=StringLengths.HOSTNAME_MAX_LEN)
273IdDescriptorColType = Unicode(length=StringLengths.ID_DESCRIPTOR_MAX_LEN)
274IdPolicyColType = String(length=StringLengths.ID_POLICY_MAX_LEN)
275# IntUnsigned = Integer().with_variant(mysql.INTEGER(unsigned=True), 'mysql')
276IPAddressColType = String(length=StringLengths.IP_ADDRESS_MAX_LEN)
277# This is a plain string.
278# See also e.g. http://sqlalchemy-utils.readthedocs.io/en/latest/_modules/sqlalchemy_utils/types/ip_address.html # noqa
280LanguageCodeColType = String(length=StringLengths.LANGUAGE_CODE_MAX_LEN)
282# Large BLOB:
283# https://stackoverflow.com/questions/43791725/sqlalchemy-how-to-make-a-longblob-column-in-mysql # noqa
284# One of these:
285# noinspection PyTypeChecker
286LongBlob = LargeBinary().with_variant(mysql.LONGBLOB, "mysql")
287# LongBlob = LargeBinary(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa
289# noinspection PyTypeChecker
290LongText = UnicodeText().with_variant(mysql.LONGTEXT, "mysql")
291# LongText = UnicodeText(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa
293MfaMethodColType = String(length=StringLengths.MFA_METHOD_MAX_LEN)
294MimeTypeColType = String(length=StringLengths.MIMETYPE_MAX_LEN)
296PatientNameColType = Unicode(length=StringLengths.PATIENT_NAME_MAX_LEN)
298Rfc2822DateColType = String(length=StringLengths.RFC_2822_DATE_MAX_LEN)
300SessionTokenColType = String(length=StringLengths.SESSION_TOKEN_MAX_LEN)
301SexColType = String(length=1)
302SummaryCategoryColType = String(
303 length=StringLengths.TASK_SUMMARY_TEXT_FIELD_DEFAULT_MAX_LEN
304)
305# ... pretty generic
307TableNameColType = String(length=StringLengths.TABLENAME_MAX_LEN)
309UrlColType = String(length=StringLengths.URL_MAX_LEN)
310UserNameCamcopsColType = String(length=StringLengths.USERNAME_CAMCOPS_MAX_LEN)
311UserNameExternalColType = String(
312 length=StringLengths.USERNAME_EXTERNAL_MAX_LEN
313)
316# =============================================================================
317# Helper operations for PendulumDateTimeAsIsoTextColType
318# =============================================================================
319# Database string format is e.g.
320# 2013-07-24T20:04:07.123456+01:00
321# 2013-07-24T20:04:07.123+01:00
322# 0 1 2 3 } position in string; 1-based
323# 12345678901234567890123456789012 }
324#
325# So: rightmost 6 characters are time zone; rest is date/time.
326# leftmost 23 characters are time up to millisecond precision.
327# overall length is typically 29 (milliseconds) or 32 (microseconds)
329_TZ_LEN = 6 # length of the timezone part of the ISO8601 string
330_UTC_TZ_LITERAL = "'+00:00'"
331_SQLITE_DATETIME_FMT_FOR_PYTHON = "'%Y-%m-%d %H:%M:%f'"
333_MYSQL_DATETIME_LEN = 19
334_SQLSERVER_DATETIME_LEN = 19
335_SQLSERVER_DATETIME2_LEN = 27
338# -----------------------------------------------------------------------------
339# isotzdatetime_to_utcdatetime
340# -----------------------------------------------------------------------------
343# noinspection PyPep8Naming
344class isotzdatetime_to_utcdatetime(FunctionElement):
345 """
346 Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`.
348 Creates an SQL expression wrapping a field containing our ISO-8601 text,
349 making a ``DATETIME`` out of it, in the UTC timezone.
351 Implemented for different SQL dialects.
352 """
354 type = DateTime()
355 name = "isotzdatetime_to_utcdatetime"
356 inherit_cache = False
359# noinspection PyUnusedLocal
360@compiles(isotzdatetime_to_utcdatetime)
361def isotzdatetime_to_utcdatetime_default(
362 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
363) -> None:
364 """
365 Default implementation for :class:`isotzdatetime_to_utcdatetime`: fail.
366 """
367 fail_unknown_dialect(compiler, "perform isotzdatetime_to_utcdatetime")
370# noinspection PyUnusedLocal
371@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.MYSQL)
372def isotzdatetime_to_utcdatetime_mysql(
373 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
374) -> str:
375 """
376 Implementation of :class:`isotzdatetime_to_utcdatetime` for MySQL.
378 For format, see
379 https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_date-format
381 Note the use of "%i" for minutes.
383 Things after ``func.`` get passed to the database engine as literal SQL
384 functions; https://docs.sqlalchemy.org/en/latest/core/tutorial.html
385 """
386 x = fetch_processed_single_clause(element, compiler)
388 # Let's do this in a clear way:
389 date_time_part = f"LEFT({x}, LENGTH({x}) - {_TZ_LEN})"
390 # ... drop the rightmost 6 chars (the timezone component)
391 fmt = compiler.process(text("'%Y-%m-%dT%H:%i:%S.%f'"))
392 # ... the text() part deals with the necessary escaping of % for the DBAPI
393 the_date_time = f"STR_TO_DATE({date_time_part}, {fmt})"
394 # ... STR_TO_DATE() returns a DATETIME if the string contains both date and
395 # time components.
396 old_timezone = f"RIGHT({x}, {_TZ_LEN})"
397 result_utc = (
398 f"CONVERT_TZ({the_date_time}, {old_timezone}, {_UTC_TZ_LITERAL})"
399 )
401 # log.debug(result_utc)
402 return result_utc
405# noinspection PyUnusedLocal
406@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLITE)
407def isotzdatetime_to_utcdatetime_sqlite(
408 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
409) -> str:
410 """
411 Implementation of :class:`isotzdatetime_to_utcdatetime` for SQLite.
413 - https://sqlite.org/lang_corefunc.html#substr
414 - https://sqlite.org/lang_datefunc.html
415 - https://www.sqlite.org/lang_expr.html
417 Get an SQL expression for the timezone adjustment in hours.
418 Note that if a time is 12:00+01:00, that means e.g. midday BST, which
419 is 11:00+00:00 or 11:00 UTC. So you SUBTRACT the displayed timezone from
420 the time, which I've always thought is a bit odd.
422 Ha! Was busy implementing this, but SQLite is magic; if there's a
423 timezone at the end, ``STRFTIME()`` will convert it to UTC automatically!
424 Moreover, the format is the OUTPUT format that a Python datetime will
425 recognize, so no 'T'.
427 The output format is like this: ``2018-06-01 00:00:00.000``. Note that
428 SQLite provides millisecond precision only (in general and via the ``%f``
429 argument to ``STRFTIME``).
431 See also SQLAlchemy's DATETIME support for SQLite:
433 - https://docs.sqlalchemy.org/en/13/dialects/sqlite.html?highlight=sqlite#sqlalchemy.dialects.sqlite.DATETIME
435 ... but that doesn't support timezones, so that doesn't help us.
437 One further problem -- see
438 :class:`camcops_server.tasks.core10.Core10ReportDateRangeTests` -- is that
439 comparisons are done by SQLite as text, so e.g.
441 .. code-block:: sql
443 SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000000'; -- 0, false
444 SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000'; -- 1, true
446 and therefore we need to ensure either that the SQLite side gets translated
447 to 6dp, or the bind param gets translated to 3dp. I don't think we can
448 always have control over the bind parameter. So we append '000' to the
449 SQLite side.
451 """ # noqa
452 x = fetch_processed_single_clause(element, compiler)
453 fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON))
454 result = f"(STRFTIME({fmt}, {x}) || '000')"
455 # log.debug(result)
456 return result
459# noinspection PyUnusedLocal
460@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLSERVER)
461def isotzdatetime_to_utcdatetime_sqlserver(
462 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
463) -> str:
464 """
465 Implementation of :class:`isotzdatetime_to_utcdatetime` for SQL Server.
467 **Converting strings to DATETIME values**
469 - ``CAST()``: Part of ANSI SQL.
470 - ``CONVERT()``: Not part of ANSI SQL; has some extra formatting options.
472 Both methods work:
474 .. code-block:: sql
476 SELECT CAST('2001-01-31T21:30:49.123' AS DATETIME) AS via_cast,
477 CONVERT(DATETIME, '2001-01-31T21:30:49.123') AS via_convert;
479 ... fine on SQL Server 2005, with milliseconds in both cases.
480 However, going beyond milliseconds doesn't fail gracefully, it causes an
481 error (e.g. "...21:30.49.123456") both for CAST and CONVERT.
483 The ``DATETIME2`` format accepts greater precision, but requires SQL Server
484 2008 or higher. Then this works:
486 .. code-block:: sql
488 SELECT CAST('2001-01-31T21:30:49.123456' AS DATETIME2) AS via_cast,
489 CONVERT(DATETIME2, '2001-01-31T21:30:49.123456') AS via_convert;
491 So as not to be too optimistic: ``CAST(x AS DATETIME2)`` ignores (silently)
492 any timezone information in the string. So does ``CONVERT(DATETIME2, x, {0
493 or 1})``.
495 **Converting between time zones**
497 NO TIME ZONE SUPPORT in SQL Server 2005.
498 e.g. https://stackoverflow.com/questions/3200827/how-to-convert-timezones-in-sql-server-2005.
500 .. code-block:: none
502 TODATETIMEOFFSET(expression, time_zone):
503 expression: something that evaluates to a DATETIME2 value
504 time_zone: integer minutes, or string hours/minutes e.g. "+13.00"
505 -> produces a DATETIMEOFFSET value
507 Available from SQL Server 2008
508 (https://docs.microsoft.com/en-us/sql/t-sql/functions/todatetimeoffset-transact-sql).
510 .. code-block:: none
512 SWITCHOFFSET
513 -> converts one DATETIMEOFFSET value to another, preserving its UTC
514 time, but changing the displayed (local) time zone.
516 ... however, is that unnecessary? We want a plain ``DATETIME2`` in UTC, and
517 .conversion to UTC is automatically achieved by ``CONVERT(DATETIME2,
518 .some_datetimeoffset, 1)``
520 ... https://stackoverflow.com/questions/4953903/how-can-i-convert-a-sql-server-2008-datetimeoffset-to-a-datetime
522 ... but not by ``CAST(some_datetimeoffset AS DATETIME2)``, and not by
523 ``CONVERT(DATETIME2, some_datetimeoffset, 0)``
525 ... and styles 0 and 1 are the only ones permissible from SQL Server 2012
526 and up (empirically, and documented for the reverse direction at
527 https://docs.microsoft.com/en-us/sql/t-sql/functions/cast-and-convert-transact-sql?view=sql-server-2017)
529 ... this is not properly documented re UTC conversion, as far as I can
530 see. Let's use ``SWITCHOFFSET -> CAST`` to be explicit and clear.
532 ``AT TIME ZONE``: From SQL Server 2016 only.
533 https://docs.microsoft.com/en-us/sql/t-sql/queries/at-time-zone-transact-sql?view=sql-server-2017
535 **Therefore**
537 - We need to require SQL Server 2008 or higher.
538 - Therefore we can use the ``DATETIME2`` type.
539 - Note that ``LEN()``, not ``LENGTH()``, is ANSI SQL; SQL Server only
540 supports ``LEN``.
542 **Example (tested on SQL Server 2014)**
544 .. code-block:: sql
546 DECLARE @source AS VARCHAR(100) = '2001-01-31T21:30:49.123456+07:00';
548 SELECT CAST(
549 SWITCHOFFSET(
550 TODATETIMEOFFSET(
551 CAST(LEFT(@source, LEN(@source) - 6) AS DATETIME2),
552 RIGHT(@source, 6)
553 ),
554 '+00:00'
555 )
556 AS DATETIME2
557 ) -- 2001-01-31 14:30:49.1234560
559 """ # noqa
560 x = fetch_processed_single_clause(element, compiler)
562 date_time_part = f"LEFT({x}, LEN({x}) - {_TZ_LEN})" # a VARCHAR
563 old_timezone = f"RIGHT({x}, {_TZ_LEN})" # a VARCHAR
564 date_time_no_tz = f"CAST({date_time_part} AS DATETIME2)" # a DATETIME2
565 date_time_offset_with_old_tz = (
566 f"TODATETIMEOFFSET({date_time_no_tz}, {old_timezone})"
567 # a DATETIMEOFFSET
568 )
569 date_time_offset_with_utc_tz = (
570 f"SWITCHOFFSET({date_time_offset_with_old_tz}, {_UTC_TZ_LITERAL})"
571 # a DATETIMEOFFSET in UTC
572 )
573 result_utc = f"CAST({date_time_offset_with_utc_tz} AS DATETIME2)"
575 # log.debug(result_utc)
576 return result_utc
579# -----------------------------------------------------------------------------
580# unknown_field_to_utcdatetime
581# -----------------------------------------------------------------------------
584# noinspection PyPep8Naming
585class unknown_field_to_utcdatetime(FunctionElement):
586 """
587 Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`.
589 Creates an SQL expression wrapping a field containing something unknown,
590 which might be a ``DATETIME`` or an ISO-formatted field, and
591 making a ``DATETIME`` out of it, in the UTC timezone.
593 Implemented for different SQL dialects.
594 """
596 type = DateTime()
597 name = "unknown_field_to_utcdatetime"
598 inherit_cache = False
601# noinspection PyUnusedLocal
602@compiles(unknown_field_to_utcdatetime)
603def unknown_field_to_utcdatetime_default(
604 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
605) -> None:
606 """
607 Default implementation for :class:`unknown_field_to_utcdatetime`: fail.
608 """
609 fail_unknown_dialect(compiler, "perform unknown_field_to_utcdatetime")
612# noinspection PyUnusedLocal
613@compiles(unknown_field_to_utcdatetime, SqlaDialectName.MYSQL)
614def unknown_field_to_utcdatetime_mysql(
615 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
616) -> str:
617 """
618 Implementation of :class:`unknown_field_to_utcdatetime` for MySQL.
620 If it's the length of a plain ``DATETIME`` e.g. ``2013-05-30 00:00:00``
621 (19), leave it as a ``DATETIME``; otherwise convert ISO -> ``DATETIME``.
622 """
623 x = fetch_processed_single_clause(element, compiler)
624 converted = isotzdatetime_to_utcdatetime_mysql(element, compiler, **kw)
625 result = f"IF(LENGTH({x}) = {_MYSQL_DATETIME_LEN}, {x}, {converted})"
626 # log.debug(result)
627 return result
630# noinspection PyUnusedLocal
631@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLITE)
632def unknown_field_to_utcdatetime_sqlite(
633 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
634) -> str:
635 """
636 Implementation of :class:`unknown_field_to_utcdatetime` for SQLite.
637 """
638 x = fetch_processed_single_clause(element, compiler)
639 fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON))
640 result = f"STRFTIME({fmt}, {x})"
641 # log.debug(result)
642 return result
645# noinspection PyUnusedLocal
646@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLSERVER)
647def unknown_field_to_utcdatetime_sqlserver(
648 element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
649) -> str:
650 """
651 Implementation of :class:`unknown_field_to_utcdatetime` for SQL Server.
653 We should cope also with the possibility of a ``DATETIME2`` field, not just
654 ``DATETIME``. It seems consistent that ``LEN(DATETIME2) = 27``, with
655 precision tenth of a microsecond, e.g. ``2001-01-31 21:30:49.1234567``
656 (27).
658 So, if it looks like a ``DATETIME`` or a ``DATETIME2``, then we leave it
659 alone; otherwise we put it through our ISO-to-datetime function.
661 Importantly, note that neither ``_SQLSERVER_DATETIME_LEN`` nor
662 ``_SQLSERVER_DATETIME2_LEN`` are the length of any of our ISO strings.
663 """
664 x = fetch_processed_single_clause(element, compiler)
665 # https://stackoverflow.com/questions/5487892/sql-server-case-when-or-then-else-end-the-or-is-not-supported # noqa
666 converted = isotzdatetime_to_utcdatetime_sqlserver(element, compiler, **kw)
667 result = (
668 f"CASE WHEN LEN({x}) IN "
669 f"({_SQLSERVER_DATETIME_LEN}, {_SQLSERVER_DATETIME2_LEN}) THEN {x} "
670 f"ELSE {converted} "
671 f"END"
672 )
673 # log.debug(result)
674 return result
677# =============================================================================
678# Custom date/time field as ISO-8601 text including timezone, using
679# pendulum.DateTime on the Python side.
680# =============================================================================
683class PendulumDateTimeAsIsoTextColType(TypeDecorator):
684 """
685 Stores date/time values as ISO-8601, in a specific format.
686 Uses Pendulum on the Python side.
687 """
689 impl = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN)
690 # ... underlying SQL type
692 cache_ok = False
694 _coltype_name = "PendulumDateTimeAsIsoTextColType"
696 @property
697 def python_type(self) -> type:
698 """
699 The Python type of the object.
700 """
701 return Pendulum
703 @staticmethod
704 def pendulum_to_isostring(x: PotentialDatetimeType) -> Optional[str]:
705 """
706 From a Python datetime to an ISO-formatted string in our particular
707 format.
708 """
709 # https://docs.python.org/3.4/library/datetime.html#strftime-strptime-behavior # noqa
710 x = coerce_to_pendulum(x)
711 try:
712 mainpart = x.strftime(
713 "%Y-%m-%dT%H:%M:%S.%f"
714 ) # microsecond accuracy
715 timezone = x.strftime("%z") # won't have the colon in
716 return mainpart + timezone[:-2] + ":" + timezone[-2:]
717 except AttributeError:
718 return None
720 @staticmethod
721 def isostring_to_pendulum(x: Optional[str]) -> Optional[Pendulum]:
722 """
723 From an ISO-formatted string to a Python Pendulum, with timezone.
724 """
725 try:
726 return coerce_to_pendulum(x)
727 except (ParserError, ValueError):
728 log.warning("Bad ISO date/time string: {!r}", x)
729 return None
731 def process_bind_param(
732 self, value: Optional[Pendulum], dialect: Dialect
733 ) -> Optional[str]:
734 """
735 Convert parameters on the way from Python to the database.
736 """
737 retval = self.pendulum_to_isostring(value)
738 if DEBUG_DATETIME_AS_ISO_TEXT:
739 log.debug(
740 "{}.process_bind_param("
741 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
742 self._coltype_name,
743 self,
744 value,
745 dialect,
746 retval,
747 )
748 return retval
750 def process_literal_param(
751 self, value: Optional[Pendulum], dialect: Dialect
752 ) -> Optional[str]:
753 """
754 Convert literals on the way from Python to the database.
755 """
756 retval = self.pendulum_to_isostring(value)
757 if DEBUG_DATETIME_AS_ISO_TEXT:
758 log.debug(
759 "{}.process_literal_param("
760 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
761 self._coltype_name,
762 self,
763 value,
764 dialect,
765 retval,
766 )
767 return retval
769 def process_result_value(
770 self, value: Optional[str], dialect: Dialect
771 ) -> Optional[Pendulum]:
772 """
773 Convert things on the way from the database to Python.
774 """
775 retval = self.isostring_to_pendulum(value)
776 if DEBUG_DATETIME_AS_ISO_TEXT:
777 log.debug(
778 "{}.process_result_value("
779 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
780 self._coltype_name,
781 self,
782 value,
783 dialect,
784 retval,
785 )
786 return retval
788 # noinspection PyPep8Naming
789 class comparator_factory(TypeDecorator.Comparator):
790 """
791 Process SQL for when we are comparing our column, in the database,
792 to something else.
794 We make this dialect-independent by calling functions like
796 .. code-block:: none
798 unknown_field_to_utcdatetime
799 isotzdatetime_to_utcdatetime
801 ... which we then specialize for specific dialects.
803 This function itself does not appear to be able to access any
804 information about the dialect.
805 """
807 def operate(
808 self, op: "OperatorType", *other: Any, **kwargs: Any
809 ) -> "ColumnElement[_CT]":
810 assert len(other) == 1
811 assert not kwargs
812 other = other[0]
813 try:
814 processed_other = convert_datetime_to_utc(
815 coerce_to_pendulum(other)
816 )
817 # - If you try to call a dialect-specialized FunctionElement,
818 # it processes the clause to "?" (meaning "attach bind
819 # parameter here"); it's not the value itself.
820 # - For our SQLite "milliseconds only" comparator problem (see
821 # above), we can't do very much here without knowing the
822 # dialect. So we make the SQLite side look like it has
823 # microseconds by appending "000"...
824 except (AttributeError, ParserError, TypeError, ValueError):
825 # OK. At this point, "other" could be a plain DATETIME field,
826 # or a PendulumDateTimeAsIsoTextColType field (or potentially
827 # something else that we don't really care about). If it's a
828 # DATETIME, then we assume it is already in UTC.
829 processed_other = unknown_field_to_utcdatetime(other) # type: ignore[assignment] # noqa: E501
830 if DEBUG_DATETIME_AS_ISO_TEXT:
831 log.debug(
832 "operate(self={!r}, op={!r}, other={!r})", self, op, other
833 )
834 log.debug("self.expr = {!r}", self.expr)
835 log.debug("processed_other = {!r}", processed_other)
836 # traceback.print_stack()
837 return op(isotzdatetime_to_utcdatetime(self.expr), processed_other)
839 def reverse_operate(
840 self, op: "OperatorType", *other: Any, **kwargs: Any
841 ) -> NoReturn:
842 assert False, "I don't think this is ever being called"
845# =============================================================================
846# Custom duration field as ISO-8601 text, using pendulum.Duration on the Python
847# side.
848# =============================================================================
851class PendulumDurationAsIsoTextColType(TypeDecorator):
852 """
853 Stores time durations as ISO-8601, in a specific format.
854 Uses :class:`pendulum.Duration` on the Python side.
855 """
857 impl = String(length=StringLengths.ISO8601_DURATION_STRING_MAX_LEN)
858 # ... underlying SQL type
860 cache_ok = False
862 _coltype_name = "PendulumDurationAsIsoTextColType"
864 @property
865 def python_type(self) -> type:
866 """
867 The Python type of the object.
868 """
869 return Duration
871 @staticmethod
872 def pendulum_duration_to_isostring(x: Optional[Duration]) -> Optional[str]:
873 """
874 From a :class:`pendulum.Duration` (or ``None``) an ISO-formatted string
875 in our particular format (or ``NULL``).
876 """
877 if x is None:
878 return None
879 return duration_to_iso(
880 x, permit_years_months=True, minus_sign_at_front=True
881 )
883 @staticmethod
884 def isostring_to_pendulum_duration(x: Optional[str]) -> Optional[Duration]:
885 """
886 From an ISO-formatted string to a Python Pendulum, with timezone.
887 """
888 if not x: # None (NULL) or blank string
889 return None
890 try:
891 return duration_from_iso(x)
892 except (ISO8601Error, ValueError):
893 log.warning("Bad ISO duration string: {!r}", x)
894 return None
896 def process_bind_param(
897 self, value: Optional[Duration], dialect: Dialect
898 ) -> Optional[str]:
899 """
900 Convert parameters on the way from Python to the database.
901 """
902 retval = self.pendulum_duration_to_isostring(value)
903 if DEBUG_DURATION_AS_ISO_TEXT:
904 log.debug(
905 "{}.process_bind_param("
906 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
907 self._coltype_name,
908 self,
909 value,
910 dialect,
911 retval,
912 )
913 return retval
915 def process_literal_param(
916 self, value: Optional[Duration], dialect: Dialect
917 ) -> Optional[str]:
918 """
919 Convert literals on the way from Python to the database.
920 """
921 retval = self.pendulum_duration_to_isostring(value)
922 if DEBUG_DURATION_AS_ISO_TEXT:
923 log.debug(
924 "{}.process_literal_param("
925 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
926 self._coltype_name,
927 self,
928 value,
929 dialect,
930 retval,
931 )
932 return retval
934 def process_result_value(
935 self, value: Optional[str], dialect: Dialect
936 ) -> Optional[Duration]:
937 """
938 Convert things on the way from the database to Python.
939 """
940 retval = self.isostring_to_pendulum_duration(value)
941 if DEBUG_DURATION_AS_ISO_TEXT:
942 log.debug(
943 "{}.process_result_value("
944 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
945 self._coltype_name,
946 self,
947 value,
948 dialect,
949 retval,
950 )
951 return retval
953 # No comparator_factory; we do not use SQL to compare ISO durations.
956# =============================================================================
957# Semantic version column type
958# =============================================================================
961class SemanticVersionColType(TypeDecorator):
962 """
963 Stores semantic versions in the database.
964 Uses :class:`semantic_version.Version` on the Python side.
965 """
967 impl = String(length=147) # https://github.com/mojombo/semver/issues/79
969 cache_ok = False
971 _coltype_name = "SemanticVersionColType"
973 @property
974 def python_type(self) -> type:
975 """
976 The Python type of the object.
977 """
978 return Version
980 def process_bind_param(
981 self, value: Optional[Version], dialect: Dialect
982 ) -> Optional[str]:
983 """
984 Convert parameters on the way from Python to the database.
985 """
986 retval = str(value) if value is not None else None
987 if DEBUG_SEMANTIC_VERSION:
988 log.debug(
989 "{}.process_bind_param("
990 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
991 self._coltype_name,
992 self,
993 value,
994 dialect,
995 retval,
996 )
997 return retval
999 def process_literal_param(
1000 self, value: Optional[Version], dialect: Dialect
1001 ) -> Optional[str]:
1002 """
1003 Convert literals on the way from Python to the database.
1004 """
1005 retval = str(value) if value is not None else None
1006 if DEBUG_SEMANTIC_VERSION:
1007 log.debug(
1008 "{}.process_literal_param("
1009 "self={!r}, value={!r}, dialect={!r}) -> !r",
1010 self._coltype_name,
1011 self,
1012 value,
1013 dialect,
1014 retval,
1015 )
1016 return retval
1018 def process_result_value(
1019 self, value: Optional[str], dialect: Dialect
1020 ) -> Optional[Version]:
1021 """
1022 Convert things on the way from the database to Python.
1023 """
1024 if value is None:
1025 retval = None
1026 else:
1027 # Here we do some slightly fancier conversion to deal with all
1028 # sorts of potential rubbish coming in, so we get a properly
1029 # ordered Version out:
1030 retval = make_version(value)
1031 if DEBUG_SEMANTIC_VERSION:
1032 log.debug(
1033 "{}.process_result_value("
1034 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
1035 self._coltype_name,
1036 self,
1037 value,
1038 dialect,
1039 retval,
1040 )
1041 return retval
1043 '''
1044 # noinspection PyPep8Naming
1045 class comparator_factory(TypeDecorator.Comparator):
1046 """
1047 Process SQL for when we are comparing our column, in the database,
1048 to something else.
1050 See https://docs.sqlalchemy.org/en/13/core/type_api.html#sqlalchemy.types.TypeEngine.comparator_factory.
1052 .. warning::
1054 I'm not sure this is either (a) correct or (b) used; it may
1055 produce a string comparison of e.g. ``14.0.0`` versus ``2.0.0``,
1056 which will be alphabetical and therefore wrong.
1057 Disabled on 2019-04-28.
1059 """ # noqa
1061 def operate(self, op, *other, **kwargs):
1062 assert len(other) == 1
1063 assert not kwargs
1064 other = other[0]
1065 if isinstance(other, Version):
1066 processed_other = str(Version)
1067 else:
1068 processed_other = other
1069 return op(self.expr, processed_other)
1071 def reverse_operate(self, op, *other, **kwargs):
1072 assert False, "I don't think this is ever being called"
1073 '''
1076# =============================================================================
1077# IdNumReferenceListColType
1078# =============================================================================
1081class IdNumReferenceListColType(TypeDecorator):
1082 """
1083 Stores a list of IdNumReference objects.
1084 On the database side, uses a comma-separated list of integers.
1085 """
1087 impl = Text()
1088 _coltype_name = "IdNumReferenceListColType"
1090 @property
1091 def python_type(self) -> type:
1092 """
1093 The Python type of the object.
1094 """
1095 return list
1097 @staticmethod
1098 def _idnumdef_list_to_dbstr(
1099 idnumdef_list: Optional[List[IdNumReference]],
1100 ) -> str:
1101 """
1102 Converts an optional list of
1103 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`
1104 objects to a CSV string suitable for storing in the database.
1105 """
1106 if not idnumdef_list:
1107 return ""
1108 elements = [] # type: List[int]
1109 for idnumdef in idnumdef_list:
1110 elements.append(idnumdef.which_idnum)
1111 elements.append(idnumdef.idnum_value)
1112 return ",".join(str(x) for x in elements)
1114 @staticmethod
1115 def _dbstr_to_idnumdef_list(dbstr: Optional[str]) -> List[IdNumReference]:
1116 """
1117 Converts a CSV string (from the database) to a list of
1118 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`
1119 objects.
1120 """
1121 idnumdef_list = [] # type: List[IdNumReference]
1122 try:
1123 intlist = [int(numstr) for numstr in dbstr.split(",")]
1124 except (AttributeError, TypeError, ValueError):
1125 return []
1126 length = len(intlist)
1127 if length == 0 or length % 2 != 0: # enforce pairs
1128 return []
1129 for which_idnum, idnum_value in chunks(intlist, n=2):
1130 if which_idnum < 0 or idnum_value < 0: # enforce positive integers
1131 return []
1132 idnumdef_list.append(
1133 IdNumReference(
1134 which_idnum=which_idnum, idnum_value=idnum_value
1135 )
1136 )
1137 return idnumdef_list
1139 def process_bind_param(
1140 self, value: Optional[List[IdNumReference]], dialect: Dialect
1141 ) -> str:
1142 """
1143 Convert parameters on the way from Python to the database.
1144 """
1145 retval = self._idnumdef_list_to_dbstr(value)
1146 if DEBUG_IDNUMDEF_LIST:
1147 log.debug(
1148 "{}.process_bind_param("
1149 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
1150 self._coltype_name,
1151 self,
1152 value,
1153 dialect,
1154 retval,
1155 )
1156 return retval
1158 def process_literal_param(
1159 self, value: Optional[List[IdNumReference]], dialect: Dialect
1160 ) -> str:
1161 """
1162 Convert literals on the way from Python to the database.
1163 """
1164 retval = self._idnumdef_list_to_dbstr(value)
1165 if DEBUG_IDNUMDEF_LIST:
1166 log.debug(
1167 "{}.process_literal_param("
1168 "self={!r}, value={!r}, dialect={!r}) -> !r",
1169 self._coltype_name,
1170 self,
1171 value,
1172 dialect,
1173 retval,
1174 )
1175 return retval
1177 def process_result_value(
1178 self, value: Optional[str], dialect: Dialect
1179 ) -> List[IdNumReference]:
1180 """
1181 Convert things on the way from the database to Python.
1182 """
1183 retval = self._dbstr_to_idnumdef_list(value)
1184 if DEBUG_IDNUMDEF_LIST:
1185 log.debug(
1186 "{}.process_result_value("
1187 "self={!r}, value={!r}, dialect={!r}) -> {!r}",
1188 self._coltype_name,
1189 self,
1190 value,
1191 dialect,
1192 retval,
1193 )
1194 return retval
1197# =============================================================================
1198# UUID column type
1199# =============================================================================
1202class UuidColType(TypeDecorator):
1203 # Based on:
1204 # https://docs.sqlalchemy.org/en/13/core/custom_types.html#backend-agnostic-guid-type # noqa: E501
1205 # which will use postgresql UUID if relevant, not doing that here
1207 impl = CHAR(32)
1209 cache_ok = False
1211 @property
1212 def python_type(self) -> type:
1213 return str
1215 def process_bind_param(
1216 self, value: uuid.UUID, dialect: Dialect
1217 ) -> Optional[str]:
1218 """
1219 Convert parameters on the way from Python to the database.
1220 """
1221 if value is None:
1222 return None
1224 return "%.32x" % value.int
1226 def process_result_value(
1227 self, value: Optional[str], dialect: Dialect
1228 ) -> Optional[uuid.UUID]:
1229 """
1230 Convert things on the way from the database to Python.
1231 """
1232 if value is None:
1233 return None
1235 return uuid.UUID(value)
1238# =============================================================================
1239# JSON column type
1240# =============================================================================
1243class JsonColType(TypeDecorator):
1244 # Unlike
1245 # https://docs.sqlalchemy.org/en/13/core/type_basics.html#sqlalchemy.types.JSON
1246 # does not use vendor-specific JSON type
1247 impl = UnicodeText
1249 cache_ok = False
1251 @property
1252 def python_type(self) -> type:
1253 return str
1255 def process_bind_param(
1256 self, value: Any, dialect: Dialect
1257 ) -> Optional[str]:
1258 """
1259 Convert parameters on the way from Python to the database.
1260 """
1261 if value is None:
1262 return None
1264 return json.dumps(value)
1266 def process_result_value(self, value: str, dialect: Dialect) -> Any:
1267 """
1268 Convert things on the way from the database to Python.
1269 """
1270 if value is None:
1271 return None
1273 return json.loads(value)
1276# =============================================================================
1277# Phone number column type
1278# =============================================================================
1281class PhoneNumberColType(TypeDecorator):
1282 impl = Unicode(length=StringLengths.PHONE_NUMBER_MAX_LEN)
1284 cache_ok = False
1286 @property
1287 def python_type(self) -> type:
1288 return str
1290 def process_bind_param(
1291 self, value: Any, dialect: Dialect
1292 ) -> Optional[str]:
1293 """
1294 Convert parameters on the way from Python to the database.
1295 """
1296 if value is None:
1297 return None
1299 return phonenumbers.format_number(
1300 value, phonenumbers.PhoneNumberFormat.E164
1301 )
1303 def process_result_value(self, value: str, dialect: Dialect) -> Any:
1304 """
1305 Convert things on the way from the database to Python.
1306 """
1307 if not value:
1308 return None
1310 # Should be stored as E164 so no need to pass a region
1311 return phonenumbers.parse(value, None)
1314# =============================================================================
1315# PermittedValueChecker: used by camcops_column
1316# =============================================================================
1319class PermittedValueChecker(object):
1320 """
1321 Represents permitted values (in columns belonging to CamCOPS tasks), and
1322 checks a value against them.
1323 """
1325 def __init__(
1326 self,
1327 not_null: bool = False,
1328 minimum: Union[int, float] = None,
1329 maximum: Union[int, float] = None,
1330 permitted_values: Sequence[Any] = None,
1331 ) -> None:
1332 """
1333 Args:
1334 not_null: must the value not be NULL?
1335 minimum: if specified, a numeric minimum value
1336 maximum: if specified, a numeric maximum value
1337 permitted_values: if specified, a list of permitted values
1338 """
1339 self.not_null = not_null
1340 self.minimum = minimum
1341 self.maximum = maximum
1342 self.permitted_values = permitted_values
1344 def is_ok(self, value: Any) -> bool:
1345 """
1346 Does the value pass our tests?
1347 """
1348 if value is None:
1349 return not self.not_null
1350 # If not_null is True, then the value is not OK; return False.
1351 # If not_null is False, then a null value passes all other tests.
1352 if (
1353 self.permitted_values is not None
1354 and value not in self.permitted_values
1355 ):
1356 return False
1357 if self.minimum is not None and value < self.minimum:
1358 return False
1359 if self.maximum is not None and value > self.maximum:
1360 return False
1361 return True
1363 def failure_msg(self, value: Any) -> str:
1364 """
1365 Why does the value not pass our tests?
1366 """
1367 if value is None:
1368 if self.not_null:
1369 return "value is None and NULL values are not permitted"
1370 else:
1371 return "" # value is OK
1372 if (
1373 self.permitted_values is not None
1374 and value not in self.permitted_values
1375 ):
1376 return (
1377 f"value {value!r} not in permitted values "
1378 f"{self.permitted_values!r}"
1379 )
1380 if self.minimum is not None and value < self.minimum:
1381 return f"value {value!r} less than minimum of {self.minimum!r}"
1382 if self.maximum is not None and value > self.maximum:
1383 return f"value {value!r} more than maximum of {self.maximum!r}"
1384 return ""
1386 def __repr__(self) -> str:
1387 return auto_repr(self)
1389 def permitted_values_inc_minmax(self) -> Tuple:
1390 """
1391 Returns permitted values, either specified directly or via a
1392 minimum/maximum.
1393 """
1394 if self.permitted_values:
1395 return tuple(self.permitted_values)
1396 # Take a punt that integer minima/maxima mean that only integers are
1397 # permitted...
1398 if isinstance(self.minimum, int) and isinstance(self.maximum, int):
1399 return tuple(range(self.minimum, self.maximum + 1))
1400 return ()
1402 def permitted_values_csv(self) -> str:
1403 """
1404 Returns a CSV representation of the permitted values.
1406 Primarily used for CRIS data dictionaries.
1407 """
1408 return ",".join(str(x) for x in self.permitted_values_inc_minmax())
1411# Specific instances, to reduce object duplication and magic numbers:
1413MIN_ZERO_CHECKER = PermittedValueChecker(minimum=0)
1415BIT_CHECKER = PermittedValueChecker(permitted_values=PV.BIT)
1416ZERO_TO_ONE_CHECKER = PermittedValueChecker(minimum=0, maximum=1)
1417ZERO_TO_TWO_CHECKER = PermittedValueChecker(minimum=0, maximum=2)
1418ZERO_TO_THREE_CHECKER = PermittedValueChecker(minimum=0, maximum=3)
1419ZERO_TO_FOUR_CHECKER = PermittedValueChecker(minimum=0, maximum=4)
1420ZERO_TO_FIVE_CHECKER = PermittedValueChecker(minimum=0, maximum=5)
1421ZERO_TO_SIX_CHECKER = PermittedValueChecker(minimum=0, maximum=6)
1422ZERO_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=0, maximum=7)
1423ZERO_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=0, maximum=8)
1424ZERO_TO_NINE_CHECKER = PermittedValueChecker(minimum=0, maximum=9)
1425ZERO_TO_10_CHECKER = PermittedValueChecker(minimum=0, maximum=10)
1426ZERO_TO_100_CHECKER = PermittedValueChecker(minimum=0, maximum=100)
1428ONE_TO_TWO_CHECKER = PermittedValueChecker(minimum=1, maximum=2)
1429ONE_TO_THREE_CHECKER = PermittedValueChecker(minimum=1, maximum=3)
1430ONE_TO_FOUR_CHECKER = PermittedValueChecker(minimum=1, maximum=4)
1431ONE_TO_FIVE_CHECKER = PermittedValueChecker(minimum=1, maximum=5)
1432ONE_TO_SIX_CHECKER = PermittedValueChecker(minimum=1, maximum=6)
1433ONE_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=1, maximum=7)
1434ONE_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=1, maximum=8)
1435ONE_TO_NINE_CHECKER = PermittedValueChecker(minimum=1, maximum=9)
1436# ONE_TO_10_CHECKER = PermittedValueChecker(minimum=1, maximum=10)
1439# =============================================================================
1440# camcops_column: provides extra functions over Column.
1441# =============================================================================
1443# Column attributes
1444COLATTR_BLOB_RELATIONSHIP_ATTR_NAME = "blob_relationship_attr_name"
1445COLATTR_EXEMPT_FROM_ANONYMISATION = "exempt_from_anonymisation"
1446COLATTR_IDENTIFIES_PATIENT = "identifies_patient"
1447COLATTR_INCLUDE_IN_ANON_STAGING_DB = "include_in_anon_staging_db"
1448COLATTR_IS_BLOB_ID_FIELD = "is_blob_id_field"
1449COLATTR_IS_CAMCOPS_COLUMN = "is_camcops_column"
1450COLATTR_PERMITTED_VALUE_CHECKER = "permitted_value_checker"
1453def camcops_column(
1454 *args: Any,
1455 include_in_anon_staging_db: bool = False,
1456 exempt_from_anonymisation: bool = False,
1457 identifies_patient: bool = False,
1458 is_blob_id_field: bool = False,
1459 blob_relationship_attr_name: str = "",
1460 permitted_value_checker: PermittedValueChecker = None,
1461 **kwargs: Any,
1462) -> Column[Any]:
1463 """
1464 Args:
1465 *args:
1466 Arguments to the :class:`Column` constructor.
1467 include_in_anon_staging_db:
1468 Ensure this is marked for inclusion in data dictionaries for an
1469 anonymisation staging database.
1470 exempt_from_anonymisation:
1471 If true: though this field might be text, it is guaranteed not
1472 to contain identifiers (e.g. it might contain only predefined
1473 disease severity descriptions) and does not require
1474 anonymisation.
1475 identifies_patient:
1476 If true: contains a patient identifier (e.g. name).
1477 is_blob_id_field:
1478 If true: this field contains a reference (client FK) to the
1479 BLOB table.
1480 blob_relationship_attr_name:
1481 For BLOB ID fields: the name of the associated relationship
1482 attribute (which, when accessed, yields the BLOB itself) in
1483 the owning class/object.
1484 permitted_value_checker:
1485 If specified, a :class:`PermittedValueChecker` that allows
1486 soft constraints to be specified on the field's contents. (That
1487 is, no constraints are specified at the database level, but we
1488 can moan if incorrect data are present.)
1489 **kwargs:
1490 Arguments to the :class:`Column` constructor.
1491 """
1492 if is_blob_id_field:
1493 assert blob_relationship_attr_name, (
1494 "If specifying a BLOB ID field, must give the attribute name "
1495 "of the relationship too"
1496 )
1497 info = {
1498 COLATTR_IS_CAMCOPS_COLUMN: True,
1499 COLATTR_INCLUDE_IN_ANON_STAGING_DB: include_in_anon_staging_db,
1500 COLATTR_EXEMPT_FROM_ANONYMISATION: exempt_from_anonymisation,
1501 COLATTR_IDENTIFIES_PATIENT: identifies_patient,
1502 COLATTR_IS_BLOB_ID_FIELD: is_blob_id_field,
1503 COLATTR_BLOB_RELATIONSHIP_ATTR_NAME: blob_relationship_attr_name,
1504 COLATTR_PERMITTED_VALUE_CHECKER: permitted_value_checker,
1505 }
1506 return Column(*args, info=info, **kwargs)
1509def mapped_camcops_column( # type: ignore[no-untyped-def]
1510 *args,
1511 include_in_anon_staging_db: bool = False,
1512 exempt_from_anonymisation: bool = False,
1513 identifies_patient: bool = False,
1514 is_blob_id_field: bool = False,
1515 blob_relationship_attr_name: str = "",
1516 permitted_value_checker: PermittedValueChecker = None,
1517 **kwargs,
1518) -> MappedColumn[Any]:
1519 """
1520 As :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` but
1521 returns a python typing-compatible MappedColumn.
1523 Args:
1524 *args:
1525 Arguments to the :class:`Column` constructor.
1526 include_in_anon_staging_db:
1527 Ensure this is marked for inclusion in data dictionaries for an
1528 anonymisation staging database.
1529 exempt_from_anonymisation:
1530 If true: though this field might be text, it is guaranteed not
1531 to contain identifiers (e.g. it might contain only predefined
1532 disease severity descriptions) and does not require
1533 anonymisation.
1534 identifies_patient:
1535 If true: contains a patient identifier (e.g. name).
1536 is_blob_id_field:
1537 If true: this field contains a reference (client FK) to the
1538 BLOB table.
1539 blob_relationship_attr_name:
1540 For BLOB ID fields: the name of the associated relationship
1541 attribute (which, when accessed, yields the BLOB itself) in
1542 the owning class/object.
1543 permitted_value_checker:
1544 If specified, a :class:`PermittedValueChecker` that allows
1545 soft constraints to be specified on the field's contents. (That
1546 is, no constraints are specified at the database level, but we
1547 can moan if incorrect data are present.)
1548 **kwargs:
1549 Arguments to the :class:`Column` constructor.
1550 """
1551 if is_blob_id_field:
1552 assert blob_relationship_attr_name, (
1553 "If specifying a BLOB ID field, must give the attribute name "
1554 "of the relationship too"
1555 )
1556 info = {
1557 COLATTR_IS_CAMCOPS_COLUMN: True,
1558 COLATTR_INCLUDE_IN_ANON_STAGING_DB: include_in_anon_staging_db,
1559 COLATTR_EXEMPT_FROM_ANONYMISATION: exempt_from_anonymisation,
1560 COLATTR_IDENTIFIES_PATIENT: identifies_patient,
1561 COLATTR_IS_BLOB_ID_FIELD: is_blob_id_field,
1562 COLATTR_BLOB_RELATIONSHIP_ATTR_NAME: blob_relationship_attr_name,
1563 COLATTR_PERMITTED_VALUE_CHECKER: permitted_value_checker,
1564 }
1565 return mapped_column(*args, info=info, **kwargs)
1568# =============================================================================
1569# Operate on Column/MappedColumn properties
1570# =============================================================================
1573def gen_columns_matching_attrnames( # type: ignore[no-untyped-def]
1574 obj, attrnames: List[str]
1575) -> Generator[Tuple[str, Column], None, None]:
1576 """
1577 Find columns of an SQLAlchemy ORM object whose attribute names match a
1578 list.
1580 Args:
1581 obj: SQLAlchemy ORM object to inspect
1582 attrnames: attribute names
1584 Yields:
1585 ``attrname, column`` tuples
1587 """
1588 for attrname, column in gen_columns(obj):
1589 if attrname in attrnames:
1590 yield attrname, column
1593def gen_camcops_columns( # type: ignore[no-untyped-def]
1594 obj,
1595) -> Generator[Tuple[str, Column], None, None]:
1596 """
1597 Finds all columns of an object that are
1598 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns.
1600 Args:
1601 obj: SQLAlchemy ORM object to inspect
1603 Yields:
1604 ``attrname, column`` tuples
1605 """
1606 for attrname, column in gen_columns(obj):
1607 if column.info.get(COLATTR_IS_CAMCOPS_COLUMN, False):
1608 yield attrname, column
1611def gen_camcops_blob_columns( # type: ignore[no-untyped-def]
1612 obj,
1613) -> Generator[Tuple[str, Column], None, None]:
1614 """
1615 Finds all columns of an object that are
1616 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns
1617 referencing the BLOB table.
1619 Args:
1620 obj: SQLAlchemy ORM object to inspect
1622 Yields:
1623 ``attrname, column`` tuples
1624 """
1625 for attrname, column in gen_camcops_columns(obj):
1626 if column.info.get(COLATTR_IS_BLOB_ID_FIELD, False):
1627 if attrname != column.name:
1628 log.warning(
1629 "BLOB field where attribute name {!r} != SQL "
1630 "column name {!r}",
1631 attrname,
1632 column.name,
1633 )
1634 yield attrname, column
1637def get_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def]
1638 """
1639 Get a list of column attribute names from an SQLAlchemy ORM object.
1640 """
1641 return [attrname for attrname, _ in gen_columns(obj)]
1644def get_camcops_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501
1645 """
1646 Get a list of
1647 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` column
1648 attribute names from an SQLAlchemy ORM object.
1649 """
1650 return [attrname for attrname, _ in gen_camcops_columns(obj)]
1653def get_camcops_blob_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501
1654 """
1655 Get a list of
1656 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` BLOB
1657 column attribute names from an SQLAlchemy ORM object.
1658 """
1659 return [attrname for attrname, _ in gen_camcops_blob_columns(obj)]
1662def permitted_value_failure_msgs(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501
1663 """
1664 Checks a SQLAlchemy ORM object instance against its permitted value checks
1665 (via its :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column`
1666 columns), if it has any.
1668 Returns a list of failure messages (empty list means all OK).
1670 If you just want to know whether it passes, a quicker way is via
1671 :func:`permitted_values_ok`.
1672 """
1673 failure_msgs = []
1674 for attrname, camcops_column_ in gen_camcops_columns(obj):
1675 pv_checker = camcops_column_.info.get(
1676 COLATTR_PERMITTED_VALUE_CHECKER
1677 ) # type: Optional[PermittedValueChecker]
1678 if pv_checker is None:
1679 continue
1680 value = getattr(obj, attrname)
1681 failure_msg = pv_checker.failure_msg(value)
1682 if failure_msg:
1683 failure_msgs.append(f"Invalid value for {attrname}: {failure_msg}")
1684 return failure_msgs
1687def permitted_values_ok(obj) -> bool: # type: ignore[no-untyped-def]
1688 """
1689 Checks whether an instance passes its permitted value checks, if it has
1690 any.
1692 If you want to know why it failed, see
1693 :func:`permitted_value_failure_msgs`.
1694 """
1695 for attrname, camcops_column_ in gen_camcops_columns(obj):
1696 pv_checker = camcops_column_.info.get(
1697 COLATTR_PERMITTED_VALUE_CHECKER
1698 ) # type: Optional[PermittedValueChecker]
1699 if pv_checker is None:
1700 continue
1701 value = getattr(obj, attrname)
1702 if not pv_checker.is_ok(value):
1703 return False
1704 return True
1707def gen_ancillary_relationships( # type: ignore[no-untyped-def]
1708 obj,
1709) -> Generator[
1710 Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]],
1711 None,
1712 None,
1713]:
1714 """
1715 For an SQLAlchemy ORM object, yields tuples of ``attrname,
1716 relationship_property, related_class`` for all relationships that are
1717 marked as a CamCOPS ancillary relationship.
1718 """
1719 for attrname, rel_prop, related_class in gen_relationships(obj):
1720 if rel_prop.info.get(RelationshipInfo.IS_ANCILLARY, None) is True:
1721 yield attrname, rel_prop, related_class
1724def gen_blob_relationships( # type: ignore[no-untyped-def]
1725 obj,
1726) -> Generator[
1727 Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]],
1728 None,
1729 None,
1730]:
1731 """
1732 For an SQLAlchemy ORM object, yields tuples of ``attrname,
1733 relationship_property, related_class`` for all relationships that are
1734 marked as a CamCOPS BLOB relationship.
1735 """
1736 for attrname, rel_prop, related_class in gen_relationships(obj):
1737 if rel_prop.info.get(RelationshipInfo.IS_BLOB, None) is True:
1738 yield attrname, rel_prop, related_class
1741# =============================================================================
1742# Specializations of camcops_column to save typing
1743# =============================================================================
1746def bool_column(name: str, *args: Any, **kwargs: Any) -> Column[bool]:
1747 type_arg = _get_bool_column_args(name, kwargs)
1749 return camcops_column(name, type_arg, *args, **kwargs)
1752def mapped_bool_column(
1753 name: str, *args: Any, **kwargs: Any
1754) -> MappedColumn[bool]:
1755 type_arg = _get_bool_column_args(name, kwargs)
1757 return mapped_camcops_column(name, type_arg, *args, **kwargs)
1760def _get_bool_column_args(name: str, kwargs: dict[str, Any]) -> Boolean:
1761 constraint_name = kwargs.pop(
1762 "constraint_name", None
1763 ) # type: Optional[str]
1764 if constraint_name:
1765 constraint_name_conv = conv(constraint_name)
1766 # ... see help for ``conv``
1767 else:
1768 constraint_name_conv = None
1769 type_arg = Boolean(name=constraint_name_conv)
1770 # The "name" parameter to Boolean() specifies the name of the
1771 # (0, 1) constraint.
1772 kwargs[COLATTR_PERMITTED_VALUE_CHECKER] = BIT_CHECKER
1774 if not constraint_name and len(name) >= LONG_COLUMN_NAME_WARNING_LIMIT:
1775 log.warning(
1776 "bool_column with long column name and no constraint name: {!r}",
1777 name,
1778 )
1780 return type_arg