Coverage for anonymise/patient.py: 83%

87 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1""" 

2crate_anon/anonymise/patient.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Patient class for CRATE anonymiser. Represents patient-specific information 

27like ID values and scrubbers.** 

28 

29""" 

30 

31import logging 

32from typing import AbstractSet, Any, Generator, List, Union 

33 

34from sqlalchemy.orm.session import Session 

35from sqlalchemy.sql import column, select, table 

36 

37from crate_anon.anonymise.config_singleton import config 

38from crate_anon.anonymise.dd import ScrubSourceFieldInfo 

39from crate_anon.anonymise.models import PatientInfo 

40from crate_anon.anonymise.scrub import PersonalizedScrubber 

41 

42log = logging.getLogger(__name__) 

43 

44 

45# ============================================================================= 

46# Generate identifiable values for a patient 

47# ============================================================================= 

48 

49 

50def gen_all_values_for_patient( 

51 session: Session, 

52 tablename: str, 

53 scrub_src_fieldinfo: List[ScrubSourceFieldInfo], 

54 pid_field: str, 

55 pid: Union[int, str], 

56) -> Generator[List[Any], None, None]: 

57 """ 

58 Generate all sensitive (``scrub_src``) values for a given patient, from a 

59 given source table. Used to build the scrubber. 

60 

61 Args: 

62 session: 

63 database session 

64 tablename: 

65 source table 

66 scrub_src_fieldinfo: 

67 list of information about the scrub-source fields 

68 pid_field: 

69 field to query for patient ID 

70 pid: 

71 patient ID 

72 

73 Yields: 

74 rows, where each row is a list of values that matches 

75 ``scrub_src_fieldinfo``. 

76 """ 

77 query = ( 

78 select(*[column(i.value_fieldname) for i in scrub_src_fieldinfo]) 

79 .where(column(pid_field) == pid) 

80 .select_from(table(tablename)) 

81 ) 

82 result = session.execute(query) 

83 for row in result: 

84 log.debug(f"... gen_all_values_for_patient yielding row: {row}") 

85 yield row 

86 

87 

88# ============================================================================= 

89# Patient class, which hosts the patient-specific scrubber 

90# ============================================================================= 

91 

92 

93class Patient: 

94 """ 

95 Class representing a patient with patient-specific information, such as 

96 PIDs, RIDs, and scrubbers. 

97 """ 

98 

99 def __init__(self, pid: Union[int, str], debug: bool = False) -> None: 

100 """ 

101 Build the scrubber based on data dictionary information, found via 

102 our singleton :class:`crate_anon.anonymise.config.Config`. 

103 

104 Args: 

105 pid: integer or string (usually integer) patient identifier 

106 debug: turn on scrubber debugging? 

107 """ 

108 self._pid = pid 

109 self._session = config.admindb.session 

110 

111 # Fetch or create PatientInfo object 

112 self._info = self._session.get(PatientInfo, pid) 

113 if self._info is None: 

114 self._info = PatientInfo(pid=pid) 

115 self._info.ensure_rid() 

116 self._info.ensure_trid(self._session) 

117 self._session.add(self._info) 

118 

119 # Scrubber 

120 self.scrubber = PersonalizedScrubber( 

121 anonymise_codes_at_word_boundaries_only=( 

122 config.anonymise_codes_at_word_boundaries_only 

123 ), 

124 anonymise_codes_at_numeric_boundaries_only=( 

125 config.anonymise_codes_at_numeric_boundaries_only 

126 ), 

127 anonymise_dates_at_word_boundaries_only=( 

128 config.anonymise_dates_at_word_boundaries_only 

129 ), 

130 anonymise_numbers_at_word_boundaries_only=( 

131 config.anonymise_numbers_at_word_boundaries_only 

132 ), 

133 anonymise_numbers_at_numeric_boundaries_only=( 

134 config.anonymise_numbers_at_numeric_boundaries_only 

135 ), 

136 anonymise_strings_at_word_boundaries_only=( 

137 config.anonymise_strings_at_word_boundaries_only 

138 ), 

139 debug=debug, 

140 hasher=config.change_detection_hasher, 

141 min_string_length_for_errors=config.min_string_length_for_errors, 

142 min_string_length_to_scrub_with=( 

143 config.min_string_length_to_scrub_with 

144 ), 

145 nonspecific_scrubber=config.nonspecific_scrubber, 

146 replacement_text_patient=config.replace_patient_info_with, 

147 replacement_text_third_party=config.replace_third_party_info_with, 

148 scrub_string_suffixes=config.scrub_string_suffixes, 

149 string_max_regex_errors=config.string_max_regex_errors, 

150 allowlist=config.allowlist, 

151 alternatives=config.phrase_alternative_words, 

152 ) 

153 

154 # Add information to the scrubber from the database. 

155 # We go through all "scrub-from" fields in the data dictionary. We 

156 # collect all values of those fields from the source database. 

157 log.debug(f"Building scrubber: pid = {pid!r}") 

158 self._third_party_pids_seen = set() 

159 self._db_table_pair_list = config.dd.get_scrub_from_db_table_pairs() 

160 self._mandatory_scrubbers_unfulfilled = ( 

161 config.dd.get_mandatory_scrubber_sigs().copy() 

162 ) 

163 self._build_scrubber( 

164 pid, depth=0, max_depth=config.thirdparty_xref_max_depth 

165 ) 

166 self._unchanged = self.scrubber_hash == self._info.scrubber_hash 

167 self._info.set_scrubber_info(self.scrubber) 

168 

169 # May raise DatabaseError (includes OperationalError, ProgrammingError) 

170 # If insert failed due to invalid mpid 

171 self._session.commit() 

172 # Commit immediately, because other processes may need this table 

173 # promptly. Otherwise, might get: 

174 # Deadlock found when trying to get lock; try restarting transaction 

175 

176 def _build_scrubber( 

177 self, pid: Union[int, str], depth: int, max_depth: int 

178 ) -> None: 

179 """ 

180 Build the scrubber for this patient. 

181 

182 We do this by finding all this patient's values within the "scrub from" 

183 columns of the source database, and adding them to our patient scrubber 

184 (or third-party scrubber as the case may be, for information about 

185 relatives etc.), according to the scrub method defined in the data 

186 dictionary row. 

187 

188 Args: 

189 pid: 

190 Integer or string (usually integer) patient identifier. 

191 depth: 

192 Current recursion depth for third-party information. If this 

193 is greater than 0, we are dealing with third-party information. 

194 max_depth: 

195 Maximum recursion depth for third-party information. 

196 """ 

197 if depth > 0: 

198 log.debug(f"Building scrubber recursively: depth = {depth}") 

199 # --------------------------------------------------------------------- 

200 # For all source tables with scrub-source information... 

201 # --------------------------------------------------------------------- 

202 for src_db, src_table in self._db_table_pair_list: 

203 session = config.sources[src_db].session 

204 # ----------------------------------------------------------------- 

205 # Build a list of scrub-from fields for this table. 

206 # ----------------------------------------------------------------- 

207 scrubsrc_infolist = config.dd.get_scrub_from_rows_as_fieldinfo( 

208 src_db=src_db, 

209 src_table=src_table, 

210 depth=depth, 

211 max_depth=max_depth, 

212 ) 

213 pid_field = config.dd.get_pid_name(src_db, src_table) 

214 if not pid_field: 

215 # Shouldn't happen -- part of the data dictionary checks. 

216 raise ValueError( 

217 f"Scrub-source table {src_db}.{src_table} " 

218 f"has no identifiable patient ID field" 

219 ) 

220 # ----------------------------------------------------------------- 

221 # Collect the actual patient-specific values for this table. 

222 # ----------------------------------------------------------------- 

223 for values in gen_all_values_for_patient( 

224 session=session, 

225 tablename=src_table, 

226 scrub_src_fieldinfo=scrubsrc_infolist, 

227 pid_field=pid_field, 

228 pid=pid, 

229 ): 

230 # The order of "values" matches that of "scrubsrc_infolist". 

231 for i, val in enumerate(values): 

232 # --------------------------------------------------------- 

233 # Add a value to the scrubber 

234 # --------------------------------------------------------- 

235 info = scrubsrc_infolist[i] 

236 self.scrubber.add_value( 

237 val, info.scrub_method, patient=info.is_patient 

238 ) 

239 

240 if info.is_mpid and self.mpid is None: 

241 # ----------------------------------------------------- 

242 # We've come across the MPID for the first time. 

243 # ----------------------------------------------------- 

244 self.set_mpid(val) 

245 

246 if info.recurse: 

247 # ----------------------------------------------------- 

248 # We've come across a patient ID of another patient, 

249 # whose information should be trawled and treated as 

250 # third-party information 

251 # ----------------------------------------------------- 

252 try: 

253 related_pid = int(val) 

254 except (ValueError, TypeError): 

255 # TypeError: NULL value (None) 

256 # ValueError: duff value, i.e. non-integer 

257 continue 

258 if related_pid in self._third_party_pids_seen: 

259 # Don't bother doing the same relative twice (if 

260 # their ID occurs in more than one place in the 

261 # patient's record); that's inefficient. 

262 continue 

263 self._third_party_pids_seen.add(related_pid) 

264 # Go and explore that other patient's record: 

265 self._build_scrubber(related_pid, depth + 1, max_depth) 

266 

267 # If this is a mandatory scrubber, note if its requirement 

268 # has been fulfilled. 

269 if val is not None and info.required_scrubber: 

270 self._mandatory_scrubbers_unfulfilled.discard( 

271 info.signature 

272 ) 

273 

274 @property 

275 def mandatory_scrubbers_unfulfilled(self) -> AbstractSet[str]: 

276 """ 

277 Returns a set of strings (each of the format ``db.table.column``) for 

278 all "required scrubber" fields that have not yet had information seen 

279 for them (for this patient), and are therefore unfulfilled. 

280 

281 See also 

282 :meth:`crate_anon.anonymise.dd.DataDictionary.get_mandatory_scrubber_sigs`. 

283 """ 

284 return self._mandatory_scrubbers_unfulfilled 

285 

286 @property 

287 def pid(self) -> Union[int, str]: 

288 """ 

289 Return the patient ID (PID). 

290 """ 

291 return self._info.pid 

292 

293 @property 

294 def mpid(self) -> Union[int, str]: 

295 """ 

296 Return the master patient ID (MPID). 

297 """ 

298 return self._info.mpid 

299 

300 def set_mpid(self, mpid: Union[int, str]) -> None: 

301 """ 

302 Set the patient MPID. 

303 """ 

304 self._info.set_mpid(mpid) 

305 

306 @property 

307 def rid(self) -> str: 

308 """ 

309 Returns the RID (encrypted PID). 

310 """ 

311 return self._info.rid 

312 

313 @property 

314 def mrid(self) -> str: 

315 """ 

316 Returns the master RID (encrypted MPID). 

317 """ 

318 return self._info.mrid 

319 

320 @property 

321 def trid(self) -> int: 

322 """ 

323 Returns the transient integer RID (TRID). 

324 """ 

325 return self._info.trid 

326 

327 @property 

328 def scrubber_hash(self) -> str: 

329 """ 

330 Return the hash of our scrubber (for change detection). 

331 """ 

332 return self.scrubber.get_hash() 

333 

334 def scrub(self, text: str) -> str: 

335 """ 

336 Use our scrubber to scrub text. 

337 

338 Args: 

339 text: the raw text, potentially containing sensitive information 

340 

341 Returns: 

342 the de-identified text 

343 """ 

344 return self.scrubber.scrub(text) 

345 

346 def is_unchanged(self) -> bool: 

347 """ 

348 Has the scrubber changed, compared to the previous hashed version in 

349 the admin database? 

350 """ 

351 return self._unchanged