Coverage for anonymise/models.py: 86%
94 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1"""
2crate_anon/anonymise/models.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**SQLAlchemy ORM models for the CRATE anonymiser, representing information it
27stores in its admin database.**
29To create a SQLAlchemy Table programmatically:
31- https://docs.sqlalchemy.org/en/latest/core/schema.html
32- https://stackoverflow.com/questions/5424942/sqlalchemy-model-definition-at-execution
33- https://stackoverflow.com/questions/2580497/database-on-the-fly-with-scripting-languages/2580543#2580543
35To create a SQLAlchemy ORM programmatically:
37- https://stackoverflow.com/questions/2574105/sqlalchemy-dynamic-mapping/2575016#2575016
38""" # noqa: E501
40import logging
41import random
42from typing import Optional, TYPE_CHECKING, Union
44from cardinal_pythonlib.sqlalchemy.orm_query import exists_orm
45from sqlalchemy import (
46 Column,
47 Text,
48)
49from sqlalchemy.exc import IntegrityError
50from sqlalchemy.orm.exc import NoResultFound
51from sqlalchemy.orm.session import Session
53from crate_anon.anonymise import SecretBase
54from crate_anon.anonymise.config_singleton import config
55from crate_anon.anonymise.constants import (
56 MAX_TRID,
57 PatientInfoConstants,
58 TABLE_KWARGS,
59 TridType,
60)
62if TYPE_CHECKING:
63 from crate_anon.anonymise.scrub import PersonalizedScrubber
65log = logging.getLogger(__name__)
68class PatientInfo(SecretBase):
69 """
70 Represent patient information in the secret admin database.
72 Design decision in this class:
74 - It gets too complicated if you try to make the fieldnames arbitrary and
75 determined by the config.
77 - So we always use 'pid', 'rid', etc.
79 - Older config settings that this decision removes:
81 .. code-block:: none
83 mapping_patient_id_fieldname
84 mapping_master_id_fieldname
86 - Note that the following are still actively used, as they can be used
87 to set the names in the OUTPUT database (not the mapping database):
89 .. code-block:: none
91 research_id_fieldname
92 trid_fieldname
93 master_research_id_fieldname
94 source_hash_fieldname
96 - The config is allowed to set three column types:
98 - the source PID type (e.g. INT, BIGINT, VARCHAR)
99 - the source MPID type (e.g. BIGINT)
100 - the encrypted (RID, MRID) type, which is set by the encryption
101 algorithm; e.g. VARCHAR(128) for SHA-512.
102 """
104 __tablename__ = PatientInfoConstants.SECRET_MAP_TABLENAME
105 __table_args__ = TABLE_KWARGS
107 pid = Column(
108 PatientInfoConstants.PID_FIELDNAME,
109 config.pidtype,
110 primary_key=True,
111 autoincrement=False,
112 comment="Patient ID (PID) (PK)",
113 )
114 rid = Column(
115 PatientInfoConstants.RID_FIELDNAME,
116 config.sqltype_encrypted_pid,
117 nullable=False,
118 unique=True,
119 comment="Research ID (RID)",
120 )
121 trid = Column(
122 PatientInfoConstants.TRID_FIELDNAME,
123 TridType,
124 unique=True,
125 comment="Transient integer research ID (TRID)",
126 )
127 mpid = Column(
128 PatientInfoConstants.MPID_FIELDNAME,
129 config.mpidtype,
130 comment="Master patient ID (MPID)",
131 )
132 mrid = Column(
133 PatientInfoConstants.MRID_FIELDNAME,
134 config.sqltype_encrypted_pid,
135 comment="Master research ID (MRID)",
136 )
137 scrubber_hash = Column(
138 "scrubber_hash",
139 config.sqltype_encrypted_pid,
140 comment="Scrubber hash (for change detection)",
141 )
142 patient_scrubber_text = Column(
143 "_raw_scrubber_patient",
144 Text,
145 comment="Raw patient scrubber (for debugging only)",
146 )
147 tp_scrubber_text = Column(
148 "_raw_scrubber_tp",
149 Text,
150 comment="Raw third-party scrubber (for debugging only)",
151 )
153 def ensure_rid(self) -> None:
154 """
155 Ensure that :attr:`rid` is a hashed version of :attr:`pid`.
156 """
157 assert self.pid is not None
158 if self.rid is not None:
159 return
160 self.rid = config.encrypt_primary_pid(self.pid)
162 def ensure_trid(self, session: Session) -> None:
163 """
164 Ensure that :attr:`trid` is a suitable transient research ID
165 (TRID): the TRID we have already generated for this PID, or a fresh
166 random integer that we'll remember.
168 Args:
169 session: SQLAlchemy database session for the secret admin database
170 """
171 assert self.pid is not None
172 if self.trid is not None:
173 return
174 # noinspection PyTypeChecker
175 self.trid = TridRecord.get_trid(session, self.pid)
177 def set_mpid(self, mpid: Union[int, str]) -> None:
178 """
179 Sets the MPID, and at the same time, the MRID (a hashed version of the
180 MPID).
182 Args:
183 mpid: master patient ID (MPID) value
184 """
185 self.mpid = mpid
186 self.mrid = config.encrypt_master_pid(self.mpid)
188 def set_scrubber_info(self, scrubber: "PersonalizedScrubber") -> None:
189 """
190 Sets our :attr:`scrubber_hash` to be the hash of the scrubber passed as
191 a parameter.
193 If our :class:`crate_anon.anonymise.config.Config` has its
194 ``save_scrubbers`` flag set, then we also save the textual regex
195 string for the patient scrubber and the third-party scrubber.
197 Args:
198 scrubber: :class:`crate_anon.anonymise.scrub.PersonalizedScrubber`
199 """
200 self.scrubber_hash = scrubber.get_hash()
201 if config.save_scrubbers:
202 self.patient_scrubber_text = scrubber.get_patient_regex_string()
203 self.tp_scrubber_text = scrubber.get_tp_regex_string()
204 else:
205 self.patient_scrubber_text = None # type: Optional[str]
206 self.tp_scrubber_text = None # type: Optional[str]
209class TridRecord(SecretBase):
210 """
211 Records the mapping from patient ID (PID) to integer transient research ID
212 (TRID), and makes new TRIDs as required.
213 """
215 __tablename__ = "secret_trid_cache"
216 __table_args__ = TABLE_KWARGS
218 pid = Column(
219 "pid",
220 config.pidtype,
221 primary_key=True,
222 autoincrement=False,
223 comment="Patient ID (PID) (PK)",
224 )
225 trid = Column(
226 "trid",
227 TridType,
228 nullable=False,
229 unique=True,
230 comment="Transient integer research ID (TRID)",
231 )
233 @classmethod
234 def get_trid(cls, session: Session, pid: Union[int, str]) -> int:
235 """
236 Looks up the PID in the database and returns its corresponding TRID.
237 If there wasn't one, make a new one, store the mapping, and return the
238 new TRID.
240 Args:
241 session: SQLAlchemy database session for the secret admin database
242 pid: patient ID (PID) value
244 Returns:
245 integer TRID
247 """
248 try:
249 obj = session.query(cls).filter(cls.pid == pid).one()
250 return obj.trid
251 except NoResultFound:
252 return cls.new_trid(session, pid)
254 @classmethod
255 def new_trid(cls, session: Session, pid: Union[int, str]) -> int:
256 """
257 Creates a new TRID: a random integer that's not yet been used as a
258 TRID.
260 We check for existence by inserting and asking the database if it's
261 happy, not by asking the database if it exists (since other processes
262 may be doing the same thing at the same time).
264 Note that MAX_TRID (currently 2.1 billion) must far exceed the number
265 of patients on the system otherwise this will be slow or in the worst
266 case get into an infinite loop. If we choose to redesign this in future
267 we could consider using UUIDs instead of integers. This would also
268 avoid the need for the IntegrityError check and rollback.
269 """
270 while True:
271 session.commit()
272 candidate = random.randint(1, MAX_TRID)
273 log.debug(f"Trying candidate TRID: {candidate}")
274 # noinspection PyArgumentList
275 obj = cls(pid=pid, trid=candidate)
276 try:
277 session.add(obj)
278 session.commit() # may raise IntegrityError
279 return candidate
280 except IntegrityError:
281 session.rollback()
284class OptOutPid(SecretBase):
285 """
286 Records the PID values of patients opting out of the anonymised database.
287 """
289 __tablename__ = "opt_out_pid"
290 __table_args__ = TABLE_KWARGS
292 pid = Column(
293 "pid",
294 config.pidtype,
295 primary_key=True,
296 autoincrement=False,
297 comment="Patient ID",
298 )
299 # If autoincrement is not specified, it becomes "auto", which turns on the
300 # autoincrement behaviour if the column is of integer type and a primary
301 # key (see sqlalchemy.sql.schema.Column.__init__). In turn, that (under SQL
302 # Server) likely makes it an IDENTITY column, this being an SQL Server
303 # mechanism for auto-incrementing
304 # (https://docs.microsoft.com/en-us/sql/t-sql/statements/create-table-transact-sql-identity-property?view=sql-server-ver16). # noqa: E501
305 # And in turn that means that when a value is explicitly inserted, it gives
306 # the error "Cannot insert explicit value for identity column in table
307 # 'opt_out_pid' when IDENTITY_INSERT is set to OFF. (544)"
309 @classmethod
310 def opting_out(cls, session: Session, pid: Union[int, str]) -> bool:
311 """
312 Is this patient opting out?
314 Args:
315 session: SQLAlchemy database session for the secret admin database
316 pid: PID of the patient to test
318 Returns:
319 opting out?
321 """
322 return exists_orm(session, cls, cls.pid == pid)
324 @classmethod
325 def add(cls, session: Session, pid: Union[int, str]) -> None:
326 """
327 Add a record of a patient who wishes to opt out.
329 Args:
330 session: SQLAlchemy database session for the secret admin database
331 pid: PID of the patient who is opting out
332 """
333 log.debug(f"Adding opt-out for PID {pid}")
334 # noinspection PyArgumentList
335 newthing = cls(pid=pid)
336 session.merge(newthing)
337 # https://stackoverflow.com/questions/12297156/fastest-way-to-insert-object-if-it-doesnt-exist-with-sqlalchemy # noqa: E501
340class OptOutMpid(SecretBase):
341 """
342 Records the MPID values of patients opting out of the anonymised database.
343 """
345 __tablename__ = "opt_out_mpid"
346 __table_args__ = TABLE_KWARGS
348 mpid = Column(
349 "mpid",
350 config.mpidtype,
351 primary_key=True,
352 autoincrement=False,
353 comment="Patient ID",
354 )
355 # See OptOutPid above re autoincrement.
357 @classmethod
358 def opting_out(cls, session: Session, mpid: Union[int, str]) -> bool:
359 """
360 Is this patient opting out?
362 Args:
363 session: SQLAlchemy database session for the secret admin database
364 mpid: MPID of the patient to test
366 Returns:
367 opting out?
369 """
370 return exists_orm(session, cls, cls.mpid == mpid)
372 @classmethod
373 def add(cls, session: Session, mpid: Union[int, str]) -> None:
374 """
375 Add a record of a patient who wishes to opt out.
377 Args:
378 session: SQLAlchemy database session for the secret admin database
379 mpid: MPID of the patient who is opting out
380 """
381 log.debug(f"Adding opt-out for MPID {mpid}")
382 # noinspection PyArgumentList
383 newthing = cls(mpid=mpid)
384 session.merge(newthing)