Coverage for preprocess/text_extractor.py: 97%

117 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2026-02-06 03:32 -0600

1""" 

2crate_anon/preprocess/text_extractor.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Extract text from a document store prior to anonymisation.** 

27 

28""" 

29 

30import logging 

31import os 

32from pathlib import Path 

33import re 

34import traceback 

35from typing import Generator, Optional, Tuple 

36 

37from cardinal_pythonlib.extract_text import ( 

38 document_to_text, 

39 ext_map, 

40 TextProcessingConfig, 

41) 

42from cardinal_pythonlib.sqlalchemy.schema import ( 

43 make_bigint_autoincrement_column, 

44) 

45from pendulum import DateTime as Pendulum 

46from sqlalchemy import ( 

47 BigInteger, 

48 Column, 

49 DateTime, 

50 insert, 

51 select, 

52 String, 

53 Table, 

54 UnicodeText, 

55 update, 

56) 

57from sqlalchemy.engine.base import Engine 

58from sqlalchemy.exc import IntegrityError, MultipleResultsFound, NoResultFound 

59from sqlalchemy.sql.schema import MetaData 

60 

61from crate_anon.anonymise.constants import AnonymiseConfigDefaults 

62from crate_anon.common.sql import add_indexes, IndexCreationInfo 

63 

64from crate_anon.preprocess.constants import ( 

65 CRATE_COL_FILE_PATH, 

66 CRATE_COL_PK, 

67 CRATE_COL_TEXT, 

68 CRATE_COL_TEXT_LAST_EXTRACTED, 

69 CRATE_IDX_PREFIX, 

70 CRATE_TABLE_EXTRACTED_TEXT, 

71) 

72from crate_anon.preprocess.systmone_ddgen import ( 

73 contextual_tablename, 

74 S1GenericCol, 

75 S1Table, 

76 SystmOneContext, 

77) 

78 

79log = logging.getLogger(__name__) 

80 

81 

82class TextExtractor: 

83 def __init__( 

84 self, 

85 engine: Engine, 

86 metadata: MetaData, 

87 context: SystmOneContext, 

88 root_directory: str, 

89 drop_table: bool = False, 

90 plain: bool = AnonymiseConfigDefaults.EXTRACT_TEXT_PLAIN, 

91 width: int = AnonymiseConfigDefaults.EXTRACT_TEXT_WIDTH, 

92 ) -> None: 

93 self.engine = engine 

94 self.metadata = metadata 

95 self.context = context 

96 self.root_directory = root_directory 

97 self.drop_table = drop_table 

98 self.plain = plain 

99 self.width = width 

100 

101 self.extensions = list(ext_map) 

102 self.extensions.remove(None) 

103 

104 def extract_all(self) -> None: 

105 self.create_table() 

106 self.process_files() 

107 

108 def create_table(self) -> None: 

109 self.extracted_text_table = self.metadata.tables.get( 

110 CRATE_TABLE_EXTRACTED_TEXT 

111 ) 

112 

113 drop_table = self.extracted_text_table is not None and self.drop_table 

114 create_table = self.extracted_text_table is None or self.drop_table 

115 

116 if drop_table: 

117 self.extracted_text_table.drop(checkfirst=True) 

118 

119 if create_table: 

120 self.extracted_text_table = self.get_table_definition() 

121 self.extracted_text_table.create(self.engine, checkfirst=True) 

122 self.index_table() 

123 

124 def generate_filenames(self) -> Generator[Tuple[str, str], None, None]: 

125 log.info(f"Extracting text from {self.root_directory}...") 

126 for dirpath, dirnames, filenames in os.walk(self.root_directory): 

127 log.debug(f"Processing {dirpath}") 

128 for filename in filenames: 

129 yield dirpath, filename 

130 

131 def index_table(self) -> None: 

132 for column in self.extracted_text_table.columns: 

133 colname = column.name 

134 if colname in self.indexed_column_names: 

135 idxname = f"{CRATE_IDX_PREFIX}_{colname}" 

136 add_indexes( 

137 self.engine, 

138 self.extracted_text_table, 

139 [ 

140 IndexCreationInfo( 

141 index_name=idxname, column=colname, unique=False 

142 ) 

143 ], 

144 ) 

145 

146 def process_files(self) -> None: 

147 raise NotImplementedError( 

148 "Implement 'process_files()' in derived class!" 

149 ) 

150 

151 @property 

152 def indexed_column_names(self) -> list[str]: 

153 raise NotImplementedError( 

154 "Implement 'indexed_column_names' property in derived class!" 

155 ) 

156 

157 def get_table_definition(self) -> Table: 

158 raise NotImplementedError( 

159 "Implement 'get_table_definition()' in derived class!" 

160 ) 

161 

162 def extract_text_from_file( 

163 self, full_path: str, extension: str 

164 ) -> Tuple[Optional[str], Pendulum]: 

165 # TODO: Read last_extracted and only update if None (or do 

166 # something clever by checking when the file was last written) 

167 last_extracted = None 

168 text = None 

169 if extension in self.extensions: 

170 log.info("... extracting text...") 

171 try: 

172 config = TextProcessingConfig( 

173 width=self.width, plain=self.plain 

174 ) 

175 text = document_to_text(filename=full_path, config=config) 

176 log.info("... extracted.") 

177 except Exception as e: 

178 traceback.print_exc() 

179 log.error(f"... caught exception from document_to_text: {e}") 

180 else: 

181 log.info(f"... unsupported file extension '{extension}'.") 

182 

183 if text is not None: 

184 last_extracted = Pendulum.now() 

185 

186 return text, last_extracted 

187 

188 

189class SystmOneTextExtractor(TextExtractor): 

190 indexed_column_names = [ 

191 CRATE_COL_PK, 

192 S1GenericCol.ROW_ID, 

193 S1GenericCol.PATIENT_ID, 

194 ] 

195 

196 def __init__(self, *args, **kwargs) -> None: 

197 super().__init__(*args, **kwargs) 

198 self.documents_table = self.metadata.tables[ 

199 contextual_tablename(S1Table.DOCUMENTS, self.context) 

200 ] 

201 

202 def get_table_definition(self) -> Table: 

203 return Table( 

204 CRATE_TABLE_EXTRACTED_TEXT, 

205 self.metadata, 

206 make_bigint_autoincrement_column(CRATE_COL_PK), 

207 Column( 

208 S1GenericCol.ROW_ID, 

209 BigInteger, 

210 comment="FK to S1_Documents", 

211 nullable=False, 

212 ), 

213 Column( 

214 S1GenericCol.PATIENT_ID, 

215 BigInteger, 

216 comment="Patient ID from S1_Documents", 

217 nullable=False, 

218 ), 

219 Column( 

220 "DocumentUID", 

221 String(16), 

222 comment="Unique ID of document", 

223 nullable=False, 

224 ), 

225 Column( 

226 CRATE_COL_FILE_PATH, 

227 String(255), 

228 comment="Path relative to docstore", 

229 unique=True, 

230 ), 

231 Column( 

232 CRATE_COL_TEXT, 

233 UnicodeText, 

234 comment="Extracted text from file", 

235 ), 

236 Column( 

237 CRATE_COL_TEXT_LAST_EXTRACTED, 

238 DateTime, 

239 comment="Date/time text was last extracted", 

240 ), 

241 ) 

242 

243 def process_files(self) -> None: 

244 with self.engine.connect() as connection: 

245 for ( 

246 full_path, 

247 row_identifier, 

248 document_uid, 

249 extension, 

250 ) in self.generate_matches(): 

251 log.info(f"Processing {full_path}...") 

252 statement = select(self.documents_table).where( 

253 self.documents_table.c.RowIdentifier == row_identifier 

254 ) 

255 try: 

256 row = connection.execute(statement).one() 

257 except NoResultFound: 

258 log.error( 

259 f"... no row found for RowIdentifier: {row_identifier}" 

260 ) 

261 continue 

262 except MultipleResultsFound: 

263 log.error( 

264 "... multiple rows found with RowIdentifier: " 

265 f"{row_identifier}" 

266 ) 

267 continue 

268 

269 id_patient = row._mapping[S1GenericCol.PATIENT_ID] 

270 text, last_extracted = self.extract_text_from_file( 

271 full_path, extension 

272 ) 

273 

274 relative_path = str(Path(*Path(full_path).parts[-2:])) 

275 values = dict( 

276 RowIdentifier=row_identifier, 

277 DocumentUID=document_uid, 

278 IDPatient=id_patient, 

279 crate_file_path=relative_path, 

280 crate_text=text, 

281 crate_text_last_extracted=last_extracted, 

282 ) 

283 

284 statement = insert(self.extracted_text_table).values(**values) 

285 try: 

286 connection.execute(statement) 

287 except IntegrityError: 

288 statement = ( 

289 update(self.extracted_text_table) 

290 .values(**values) 

291 .where( 

292 self.extracted_text_table.c.crate_file_path 

293 == relative_path 

294 ) 

295 ) 

296 connection.commit() 

297 

298 def generate_matches( 

299 self, 

300 ) -> Generator[Tuple[str, int, str, str], None, None]: 

301 # Groups: 

302 # 1: RowIdentifier 

303 # 2: DocumentUID (sometimes incorrectly set to IDOrganisation) 

304 # 3: Subfolder 1-4 

305 # 4: Index where document split across files 

306 # 5: Extension, mixed case 

307 regex = r"(\d+)_([0-9a-f]+)_(\d+)_(\d+)(\.\S+)" 

308 

309 for dirpath, filename in self.generate_filenames(): 

310 file_path = os.path.join(dirpath, filename) 

311 if m := re.match(regex, filename): 

312 row_identifier = int(m.group(1)) 

313 document_uid = m.group(2) 

314 extension = m.group(5).lower() 

315 yield file_path, row_identifier, document_uid, extension 

316 else: 

317 log.info(f"Completely ignoring {file_path}")