Coverage for cc_modules/cc_anon.py: 17%
162 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/cc_modules/cc_anon.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Anonymisation functions.**
28Largely superseded by CRATE (https://doi.org/10.1186%2Fs12911-017-0437-1).
30"""
32from collections import OrderedDict
33import csv
34import sys
35from typing import Dict, List, Generator, TextIO, Tuple, TYPE_CHECKING, Union
37from cardinal_pythonlib.sqlalchemy.orm_inspect import coltype_as_typeengine
38from cardinal_pythonlib.sqlalchemy.schema import (
39 convert_sqla_type_for_dialect,
40 does_sqlatype_require_index_len,
41 is_sqlatype_date,
42 is_sqlatype_text_of_length_at_least,
43 RE_COLTYPE_WITH_ONE_PARAM,
44)
45from cardinal_pythonlib.sqlalchemy.session import SQLITE_MEMORY_URL
47# from sqlalchemy.dialects.mssql.base import MSDialect
48from sqlalchemy.dialects.mysql.base import MySQLDialect
49from sqlalchemy.engine import create_engine
50from sqlalchemy.engine.interfaces import Dialect
51from sqlalchemy.orm import Session as SqlASession, sessionmaker
52from sqlalchemy.sql.schema import Column
54from camcops_server.cc_modules.cc_constants import TABLET_ID_FIELD
55from camcops_server.cc_modules.cc_db import FN_PK
56from camcops_server.cc_modules.cc_dump import DumpController
57from camcops_server.cc_modules.cc_patient import Patient
58from camcops_server.cc_modules.cc_patientidnum import (
59 extra_id_colname,
60 EXTRA_IDNUM_FIELD_PREFIX,
61)
62from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions
63from camcops_server.cc_modules.cc_sqla_coltypes import (
64 COLATTR_EXEMPT_FROM_ANONYMISATION,
65 COLATTR_IDENTIFIES_PATIENT,
66 COLATTR_INCLUDE_IN_ANON_STAGING_DB,
67 COLATTR_IS_CAMCOPS_COLUMN,
68 COLATTR_PERMITTED_VALUE_CHECKER,
69)
71if TYPE_CHECKING:
72 from camcops_server.cc_modules.cc_exportrecipientinfo import (
73 ExportRecipientInfo,
74 )
75 from camcops_server.cc_modules.cc_request import CamcopsRequest
78# =============================================================================
79# Constants
80# =============================================================================
82MIN_STRING_LENGTH_TO_CONSIDER_SCRUBBING = 256
85# =============================================================================
86# Write data dictionaries for anonymisation tools
87# =============================================================================
90def _gen_columns_for_anon_staging_db(
91 req: "CamcopsRequest", recipient: "ExportRecipientInfo"
92) -> Generator[Column, None, None]:
93 """
94 Generates all columns for an anonymisation staging database.
95 """
96 url = SQLITE_MEMORY_URL
97 engine = create_engine(url, echo=False)
98 session = sessionmaker(bind=engine)() # type: SqlASession
99 export_options = TaskExportOptions(
100 include_blobs=recipient.db_include_blobs, # type: ignore[arg-type]
101 db_patient_id_per_row=recipient.db_patient_id_per_row, # type: ignore[arg-type] # noqa: E501
102 db_make_all_tables_even_empty=True,
103 db_include_summaries=recipient.db_add_summaries, # type: ignore[arg-type] # noqa: E501
104 )
106 dc = DumpController(
107 dst_engine=engine,
108 dst_session=session,
109 export_options=export_options,
110 req=req,
111 )
112 for col in dc.gen_all_dest_columns():
113 yield col
116# -----------------------------------------------------------------------------
117# CRIS
118# -----------------------------------------------------------------------------
121def _get_type_size_as_text_from_sqltype(sqltype: str) -> Tuple[str, str]:
122 """
123 Splits SQL size definitions like ``VARCHAR(10)`` into tuples like
124 ``('VARCHAR', '10')`` If it doesn't fit that format, return
125 ``(sqltype, '')``.
126 """
127 m = RE_COLTYPE_WITH_ONE_PARAM.match(sqltype)
128 if m is not None:
129 finaltype = m.group("type").upper()
130 size = m.group("size").strip().upper()
131 else:
132 size = ""
133 finaltype = sqltype
134 return finaltype, size
137# noinspection PyUnusedLocal
138def _get_cris_dd_row(
139 column: Union[Column, None],
140 recipient: "ExportRecipientInfo",
141 dest_dialect: Dialect = None,
142) -> Dict:
143 """
144 Args:
145 column:
146 A column specification (or ``None`` to create a dummy dictionary).
147 dest_dialect:
148 The SQL dialect of the destination database. If ``None``, then
149 MySQL is used as the default.
151 Returns:
152 An :class:`OrderedDict` with information for a CRIS data dictionary
153 row.
154 """
155 dest_dialect = dest_dialect or MySQLDialect() # MSDialect() for SQL Server
156 valid_values = None
157 if column is None:
158 # Dummy row
159 colname = None
160 tablename = None
161 taskname = None
162 comment = None
163 feft = None
164 security_status = None
165 finaltype = None
166 tlfa = None
167 size = None
168 else:
169 colname = column.name
170 tablename = column.table.name
171 taskname = tablename
172 comment = column.comment
173 coltype = coltype_as_typeengine(column.type)
174 is_free_text = is_sqlatype_text_of_length_at_least(
175 coltype, min_length=MIN_STRING_LENGTH_TO_CONSIDER_SCRUBBING
176 )
177 exempt_from_anonymisation = False
178 identifies_patient = False
180 if column.info.get(COLATTR_IS_CAMCOPS_COLUMN, False):
181 exempt_from_anonymisation = column.info[
182 COLATTR_EXEMPT_FROM_ANONYMISATION
183 ]
184 identifies_patient = column.info[COLATTR_IDENTIFIES_PATIENT]
185 if column.info[COLATTR_PERMITTED_VALUE_CHECKER]:
186 valid_values = column.info[
187 COLATTR_PERMITTED_VALUE_CHECKER
188 ].permitted_values_csv()
190 needs_scrubbing = is_free_text and not exempt_from_anonymisation
192 # Tag list - fields anon
193 tlfa = "Y" if needs_scrubbing else ""
195 # Destination SQL type
196 desttype = convert_sqla_type_for_dialect(
197 coltype=coltype,
198 dialect=dest_dialect,
199 strip_collation=True,
200 expand_for_scrubbing=needs_scrubbing,
201 )
202 destsqltype = desttype.compile(dialect=dest_dialect)
203 finaltype, size = _get_type_size_as_text_from_sqltype(destsqltype)
205 # Security status
206 system_id = colname == TABLET_ID_FIELD or colname.endswith("_id")
207 patient_idnum_field = colname.startswith(EXTRA_IDNUM_FIELD_PREFIX)
208 internal_field = colname.startswith("_")
209 if identifies_patient and (
210 tablename == Patient.__tablename__
211 and colname == Patient.dob.name # type: ignore[attr-defined]
212 ):
213 security_status = 3 # truncate (e.g. DOB, postcode)
214 elif identifies_patient and tablename == Patient.__tablename__:
215 security_status = 2 # use to scrub
216 elif system_id or internal_field or identifies_patient:
217 security_status = 1 # drop (e.g. for pointless internal keys)
218 else:
219 security_status = 4 # bring through
221 # Front end field type
222 if system_id or patient_idnum_field:
223 feft = 34 # patient ID; other internal keys
224 elif is_sqlatype_date(coltype):
225 feft = 4 # dates
226 elif is_free_text:
227 feft = 3 # giant free text, I think
228 elif valid_values is not None:
229 feft = 2 # picklist
230 else:
231 feft = 1 # text, numbers
233 return OrderedDict(
234 [
235 ("Tab", "CamCOPS"),
236 ("Form name", taskname),
237 ("CRIS tree label", colname),
238 ("Source system table name", tablename),
239 ("SQL column name", colname),
240 ("Front end field type", feft),
241 ("Valid values", valid_values),
242 ("Result column name", colname),
243 ("Family doc tab name", ""),
244 ("Family doc form name", ""),
245 ("Security status", security_status),
246 ("Exclude", ""),
247 ("End SQL Type", finaltype),
248 ("Header field (Y/N)", ""),
249 ("Header field name", ""),
250 ("Header field active (Y/N)", ""),
251 ("View name", ""),
252 ("Exclude from family doc", ""),
253 ("Tag list - fields anon", tlfa),
254 ("Anon type", ""), # formerly "Additional info"
255 ("Form start date", ""),
256 ("Form end date", ""),
257 ("Source", ""),
258 ("Size", size),
259 ("Header logic", ""),
260 ("Patient/contact", ""),
261 ("Comments", comment),
262 ]
263 )
266def write_cris_data_dictionary(
267 req: "CamcopsRequest",
268 recipient: "ExportRecipientInfo",
269 file: TextIO = sys.stdout,
270) -> None:
271 """
272 Generates a draft CRIS data dictionary.
274 CRIS is an anonymisation tool. See
276 - Stewart R, Soremekun M, Perera G, Broadbent M, Callard F, Denis M, Hotopf
277 M, Thornicroft G, Lovestone S (2009).
278 The South London and Maudsley NHS Foundation Trust Biomedical Research
279 Centre (SLAM BRC) case register: development and descriptive data.
280 *BMC Psychiatry* 9: 51.
281 https://www.ncbi.nlm.nih.gov/pubmed/19674459
283 - Fernandes AC, Cloete D, Broadbent MT, Hayes RD, Chang CK, Jackson RG,
284 Roberts A, Tsang J, Soncul M, Liebscher J, Stewart R, Callard F (2013).
285 Development and evaluation of a de-identification procedure for a case
286 register sourced from mental health electronic records.
287 *BMC Med Inform Decis Mak.* 13: 71.
288 https://www.ncbi.nlm.nih.gov/pubmed/23842533
290 Args:
291 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
292 recipient: a :class:`camcops_server.cc_modules.cc_exportrecipientinfo.ExportRecipientInfo`
293 file: output file
294 """ # noqa
295 dummy = _get_cris_dd_row(column=None, recipient=recipient)
296 wr = csv.DictWriter(file, fieldnames=list(dummy.keys()))
297 wr.writeheader()
298 for col in _gen_columns_for_anon_staging_db(req, recipient):
299 d = _get_cris_dd_row(column=col, recipient=recipient)
300 wr.writerow(d)
303# -----------------------------------------------------------------------------
304# CRATE
305# -----------------------------------------------------------------------------
308def _get_crate_dd_row(
309 column: Union[Column, None],
310 recipient: "ExportRecipientInfo",
311 dest_dialect: Dialect = None,
312 src_db: str = "camcops",
313 default_indexlen: int = 100,
314) -> Dict:
315 """
316 Args:
317 column:
318 A column specification (or ``None`` to create a dummy dictionary).
319 recipient:
320 a :class:`camcops_server.cc_modules.cc_exportrecipientinfo.ExportRecipientInfo`
321 dest_dialect:
322 The SQL dialect of the destination database. If ``None``, then
323 MySQL is used as the default.
324 src_db:
325 Value to be used for the "src_db" field.
326 default_indexlen:
327 Default index length for fields that require one.
329 Returns:
330 An :class:`OrderedDict` with information for a CRATE data dictionary
331 row.
332 """ # noqa
333 dest_dialect = dest_dialect or MySQLDialect()
334 exempt_from_anonymisation = False
335 identifies_patient = False
336 identifies_respondent = False
337 force_include = False
338 if column is None:
339 # Dummy row
340 colname = None
341 tablename = None
342 comment = None
343 coltype = None
344 needs_scrubbing = False
345 desttype = None
346 destsqltype = None
347 else:
348 colname = column.name
349 tablename = column.table.name
350 comment = column.comment
351 coltype = coltype_as_typeengine(column.type)
352 is_free_text = is_sqlatype_text_of_length_at_least(
353 coltype, min_length=MIN_STRING_LENGTH_TO_CONSIDER_SCRUBBING
354 )
356 if column.info.get(COLATTR_IS_CAMCOPS_COLUMN, False):
357 exempt_from_anonymisation = column.info[
358 COLATTR_EXEMPT_FROM_ANONYMISATION
359 ]
360 identifies_patient = column.info[COLATTR_IDENTIFIES_PATIENT]
361 force_include = column.info[COLATTR_INCLUDE_IN_ANON_STAGING_DB]
363 needs_scrubbing = is_free_text and not exempt_from_anonymisation
364 desttype = convert_sqla_type_for_dialect(
365 coltype=coltype,
366 dialect=dest_dialect,
367 strip_collation=True,
368 expand_for_scrubbing=needs_scrubbing,
369 )
370 destsqltype = desttype.compile(dialect=dest_dialect)
372 # src_flags
373 src_flags = [] # type: List[str]
374 primary_key = colname == FN_PK
375 if primary_key:
376 src_flags.extend(["K", "C"])
377 primary_pid = (
378 recipient.db_patient_id_per_row
379 and recipient.primary_idnum # otherwise just in PatientIdNum
380 and colname == extra_id_colname(recipient.primary_idnum) # type: ignore[arg-type] # noqa: E501
381 )
382 if primary_pid:
383 src_flags.append("P")
384 defines_primary_pids = False # no single unique table for this...
385 if defines_primary_pids:
386 src_flags.append("*")
387 master_pid = False # not supported for now
388 if master_pid:
389 src_flags.append("M")
391 # scrub_src
392 if identifies_patient and tablename == Patient.__tablename__:
393 scrub_src = "patient"
394 elif identifies_respondent:
395 scrub_src = "thirdparty"
396 else:
397 scrub_src = None
399 # scrub_method
400 scrub_method = None # default is fine
402 # Include in output?
403 include = (
404 force_include
405 or primary_key
406 or primary_pid
407 or master_pid
408 or not (identifies_patient or identifies_respondent)
409 )
411 # alter_method
412 if needs_scrubbing:
413 alter_method = "scrub"
414 elif (
415 tablename == Patient.__tablename__
416 and colname == Patient.dob.name # type: ignore[attr-defined]
417 ):
418 alter_method = "truncate_date"
419 else:
420 alter_method = None
422 # Indexing
423 crate_index = None
424 crate_indexlen = None
425 if column is not None and column.index:
426 crate_index = "U" if column.unique else "I"
427 if does_sqlatype_require_index_len(desttype):
428 crate_indexlen = default_indexlen
430 return OrderedDict(
431 [
432 ("src_db", src_db),
433 ("src_table", tablename),
434 ("src_field", colname),
435 ("src_datatype", str(coltype)),
436 ("src_flags", "".join(src_flags) if src_flags else None),
437 ("scrub_src", scrub_src),
438 ("scrub_method", scrub_method),
439 ("decision", "include" if include else "OMIT"),
440 ("inclusion_values", None),
441 ("exclusion_values", None),
442 ("alter_method", alter_method),
443 ("dest_table", tablename),
444 ("dest_field", colname),
445 ("dest_datatype", destsqltype),
446 ("index", crate_index),
447 ("indexlen", crate_indexlen),
448 ("comment", comment),
449 ]
450 )
453def write_crate_data_dictionary(
454 req: "CamcopsRequest",
455 recipient: "ExportRecipientInfo",
456 file: TextIO = sys.stdout,
457) -> None:
458 """
459 Generates a draft CRATE data dictionary.
461 CRATE is an anonymisation tool. See:
463 - Cardinal RN (2017).
464 Clinical records anonymisation and text extraction (CRATE): an
465 open-source software system.
466 *BMC Medical Informatics and Decision Making* 17: 50.
467 https://www.pubmed.gov/28441940;
468 https://doi.org/10.1186/s12911-017-0437-1.
470 - https://crateanon.readthedocs.io/
472 Args:
473 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
474 recipient: a :class:`camcops_server.cc_modules.cc_exportrecipientinfo.ExportRecipientInfo`
475 file: output file
476 """ # noqa
477 dummy = _get_crate_dd_row(column=None, recipient=recipient)
478 wr = csv.DictWriter(file, fieldnames=list(dummy.keys()))
479 wr.writeheader()
480 for col in _gen_columns_for_anon_staging_db(req, recipient):
481 d = _get_crate_dd_row(column=col, recipient=recipient)
482 wr.writerow(d)