Coverage for preprocess/text_extractor.py: 97%
117 statements
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-06 03:32 -0600
« prev ^ index » next coverage.py v7.8.0, created at 2026-02-06 03:32 -0600
1"""
2crate_anon/preprocess/text_extractor.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Extract text from a document store prior to anonymisation.**
28"""
30import logging
31import os
32from pathlib import Path
33import re
34import traceback
35from typing import Generator, Optional, Tuple
37from cardinal_pythonlib.extract_text import (
38 document_to_text,
39 ext_map,
40 TextProcessingConfig,
41)
42from cardinal_pythonlib.sqlalchemy.schema import (
43 make_bigint_autoincrement_column,
44)
45from pendulum import DateTime as Pendulum
46from sqlalchemy import (
47 BigInteger,
48 Column,
49 DateTime,
50 insert,
51 select,
52 String,
53 Table,
54 UnicodeText,
55 update,
56)
57from sqlalchemy.engine.base import Engine
58from sqlalchemy.exc import IntegrityError, MultipleResultsFound, NoResultFound
59from sqlalchemy.sql.schema import MetaData
61from crate_anon.anonymise.constants import AnonymiseConfigDefaults
62from crate_anon.common.sql import add_indexes, IndexCreationInfo
64from crate_anon.preprocess.constants import (
65 CRATE_COL_FILE_PATH,
66 CRATE_COL_PK,
67 CRATE_COL_TEXT,
68 CRATE_COL_TEXT_LAST_EXTRACTED,
69 CRATE_IDX_PREFIX,
70 CRATE_TABLE_EXTRACTED_TEXT,
71)
72from crate_anon.preprocess.systmone_ddgen import (
73 contextual_tablename,
74 S1GenericCol,
75 S1Table,
76 SystmOneContext,
77)
79log = logging.getLogger(__name__)
82class TextExtractor:
83 def __init__(
84 self,
85 engine: Engine,
86 metadata: MetaData,
87 context: SystmOneContext,
88 root_directory: str,
89 drop_table: bool = False,
90 plain: bool = AnonymiseConfigDefaults.EXTRACT_TEXT_PLAIN,
91 width: int = AnonymiseConfigDefaults.EXTRACT_TEXT_WIDTH,
92 ) -> None:
93 self.engine = engine
94 self.metadata = metadata
95 self.context = context
96 self.root_directory = root_directory
97 self.drop_table = drop_table
98 self.plain = plain
99 self.width = width
101 self.extensions = list(ext_map)
102 self.extensions.remove(None)
104 def extract_all(self) -> None:
105 self.create_table()
106 self.process_files()
108 def create_table(self) -> None:
109 self.extracted_text_table = self.metadata.tables.get(
110 CRATE_TABLE_EXTRACTED_TEXT
111 )
113 drop_table = self.extracted_text_table is not None and self.drop_table
114 create_table = self.extracted_text_table is None or self.drop_table
116 if drop_table:
117 self.extracted_text_table.drop(checkfirst=True)
119 if create_table:
120 self.extracted_text_table = self.get_table_definition()
121 self.extracted_text_table.create(self.engine, checkfirst=True)
122 self.index_table()
124 def generate_filenames(self) -> Generator[Tuple[str, str], None, None]:
125 log.info(f"Extracting text from {self.root_directory}...")
126 for dirpath, dirnames, filenames in os.walk(self.root_directory):
127 log.debug(f"Processing {dirpath}")
128 for filename in filenames:
129 yield dirpath, filename
131 def index_table(self) -> None:
132 for column in self.extracted_text_table.columns:
133 colname = column.name
134 if colname in self.indexed_column_names:
135 idxname = f"{CRATE_IDX_PREFIX}_{colname}"
136 add_indexes(
137 self.engine,
138 self.extracted_text_table,
139 [
140 IndexCreationInfo(
141 index_name=idxname, column=colname, unique=False
142 )
143 ],
144 )
146 def process_files(self) -> None:
147 raise NotImplementedError(
148 "Implement 'process_files()' in derived class!"
149 )
151 @property
152 def indexed_column_names(self) -> list[str]:
153 raise NotImplementedError(
154 "Implement 'indexed_column_names' property in derived class!"
155 )
157 def get_table_definition(self) -> Table:
158 raise NotImplementedError(
159 "Implement 'get_table_definition()' in derived class!"
160 )
162 def extract_text_from_file(
163 self, full_path: str, extension: str
164 ) -> Tuple[Optional[str], Pendulum]:
165 # TODO: Read last_extracted and only update if None (or do
166 # something clever by checking when the file was last written)
167 last_extracted = None
168 text = None
169 if extension in self.extensions:
170 log.info("... extracting text...")
171 try:
172 config = TextProcessingConfig(
173 width=self.width, plain=self.plain
174 )
175 text = document_to_text(filename=full_path, config=config)
176 log.info("... extracted.")
177 except Exception as e:
178 traceback.print_exc()
179 log.error(f"... caught exception from document_to_text: {e}")
180 else:
181 log.info(f"... unsupported file extension '{extension}'.")
183 if text is not None:
184 last_extracted = Pendulum.now()
186 return text, last_extracted
189class SystmOneTextExtractor(TextExtractor):
190 indexed_column_names = [
191 CRATE_COL_PK,
192 S1GenericCol.ROW_ID,
193 S1GenericCol.PATIENT_ID,
194 ]
196 def __init__(self, *args, **kwargs) -> None:
197 super().__init__(*args, **kwargs)
198 self.documents_table = self.metadata.tables[
199 contextual_tablename(S1Table.DOCUMENTS, self.context)
200 ]
202 def get_table_definition(self) -> Table:
203 return Table(
204 CRATE_TABLE_EXTRACTED_TEXT,
205 self.metadata,
206 make_bigint_autoincrement_column(CRATE_COL_PK),
207 Column(
208 S1GenericCol.ROW_ID,
209 BigInteger,
210 comment="FK to S1_Documents",
211 nullable=False,
212 ),
213 Column(
214 S1GenericCol.PATIENT_ID,
215 BigInteger,
216 comment="Patient ID from S1_Documents",
217 nullable=False,
218 ),
219 Column(
220 "DocumentUID",
221 String(16),
222 comment="Unique ID of document",
223 nullable=False,
224 ),
225 Column(
226 CRATE_COL_FILE_PATH,
227 String(255),
228 comment="Path relative to docstore",
229 unique=True,
230 ),
231 Column(
232 CRATE_COL_TEXT,
233 UnicodeText,
234 comment="Extracted text from file",
235 ),
236 Column(
237 CRATE_COL_TEXT_LAST_EXTRACTED,
238 DateTime,
239 comment="Date/time text was last extracted",
240 ),
241 )
243 def process_files(self) -> None:
244 with self.engine.connect() as connection:
245 for (
246 full_path,
247 row_identifier,
248 document_uid,
249 extension,
250 ) in self.generate_matches():
251 log.info(f"Processing {full_path}...")
252 statement = select(self.documents_table).where(
253 self.documents_table.c.RowIdentifier == row_identifier
254 )
255 try:
256 row = connection.execute(statement).one()
257 except NoResultFound:
258 log.error(
259 f"... no row found for RowIdentifier: {row_identifier}"
260 )
261 continue
262 except MultipleResultsFound:
263 log.error(
264 "... multiple rows found with RowIdentifier: "
265 f"{row_identifier}"
266 )
267 continue
269 id_patient = row._mapping[S1GenericCol.PATIENT_ID]
270 text, last_extracted = self.extract_text_from_file(
271 full_path, extension
272 )
274 relative_path = str(Path(*Path(full_path).parts[-2:]))
275 values = dict(
276 RowIdentifier=row_identifier,
277 DocumentUID=document_uid,
278 IDPatient=id_patient,
279 crate_file_path=relative_path,
280 crate_text=text,
281 crate_text_last_extracted=last_extracted,
282 )
284 statement = insert(self.extracted_text_table).values(**values)
285 try:
286 connection.execute(statement)
287 except IntegrityError:
288 statement = (
289 update(self.extracted_text_table)
290 .values(**values)
291 .where(
292 self.extracted_text_table.c.crate_file_path
293 == relative_path
294 )
295 )
296 connection.commit()
298 def generate_matches(
299 self,
300 ) -> Generator[Tuple[str, int, str, str], None, None]:
301 # Groups:
302 # 1: RowIdentifier
303 # 2: DocumentUID (sometimes incorrectly set to IDOrganisation)
304 # 3: Subfolder 1-4
305 # 4: Index where document split across files
306 # 5: Extension, mixed case
307 regex = r"(\d+)_([0-9a-f]+)_(\d+)_(\d+)(\.\S+)"
309 for dirpath, filename in self.generate_filenames():
310 file_path = os.path.join(dirpath, filename)
311 if m := re.match(regex, filename):
312 row_identifier = int(m.group(1))
313 document_uid = m.group(2)
314 extension = m.group(5).lower()
315 yield file_path, row_identifier, document_uid, extension
316 else:
317 log.info(f"Completely ignoring {file_path}")