Coverage for nlp_webserver/tasks.py: 58%

129 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1r""" 

2crate_anon/nlp_webserver/tasks.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26Tasks to process text for an NLPRP server (and be scheduled via Celery). 

27 

28""" 

29 

30from collections import defaultdict 

31import datetime 

32import json 

33import logging 

34from typing import TYPE_CHECKING 

35 

36from cardinal_pythonlib.httpconst import HttpStatus 

37from cardinal_pythonlib.json_utils.typing_helpers import ( 

38 JsonArrayType, 

39 JsonObjectType, 

40) 

41from celery import Celery 

42 

43# from celery.app.task import Task # see "def delay", "def apply_async" 

44from cryptography.fernet import Fernet 

45import requests 

46from sqlalchemy import engine_from_config 

47from sqlalchemy.orm import scoped_session 

48from sqlalchemy.exc import SQLAlchemyError 

49import transaction 

50 

51from crate_anon.common.constants import JSON_SEPARATORS_COMPACT 

52from crate_anon.nlp_manager.constants import GateApiKeys, GateResultKeys 

53from crate_anon.nlp_webserver.models import Session, DocProcRequest 

54from crate_anon.nlp_webserver.server_processor import ServerProcessor 

55from crate_anon.nlp_webserver.constants import ( 

56 NlpServerConfigKeys, 

57 PROCTYPE_GATE, 

58 SQLALCHEMY_COMMON_OPTIONS, 

59) 

60from crate_anon.nlp_webserver.security import decrypt_password 

61from crate_anon.nlp_webserver.settings import SETTINGS 

62from crate_anon.nlprp.api import NlprpKeys 

63 

64if TYPE_CHECKING: 

65 from typing import Optional 

66 

67log = logging.getLogger(__name__) 

68 

69 

70# ============================================================================= 

71# Constants 

72# ============================================================================= 

73 

74# Use the more explicit NLPRP dictionary (JSON object) result format, rather 

75# than the simpler list (JSON array) format? 

76USE_DICT_FORMAT_NLPRP_RESULT = True 

77 

78# Following on from that: 

79EMPTY_PROCESSOR_RESULT = {} if USE_DICT_FORMAT_NLPRP_RESULT else [] 

80 

81 

82# ============================================================================= 

83# SQLAlchemy and Celery setup 

84# ============================================================================= 

85 

86TaskSession = scoped_session(Session) 

87engine = engine_from_config( 

88 SETTINGS, 

89 NlpServerConfigKeys.SQLALCHEMY_PREFIX, 

90 **SQLALCHEMY_COMMON_OPTIONS, 

91) 

92TaskSession.configure(bind=engine) 

93 

94try: 

95 broker_url = SETTINGS[NlpServerConfigKeys.BROKER_URL] 

96except KeyError: 

97 log.error( 

98 f"{NlpServerConfigKeys.BROKER_URL} value " f"missing from config file." 

99 ) 

100 raise 

101backend_url = SETTINGS.get(NlpServerConfigKeys.BACKEND_URL) or None 

102 

103key = SETTINGS[NlpServerConfigKeys.ENCRYPTION_KEY] 

104# Turn key into bytes object 

105key = key.encode() 

106CIPHER_SUITE = Fernet(key) 

107 

108# Set expiry time to 90 days in seconds 

109expiry_time = 60 * 60 * 24 * 90 

110celery_app = Celery( 

111 "tasks", broker=broker_url, backend=backend_url, result_expires=expiry_time 

112) 

113celery_app.conf.database_engine_options = SQLALCHEMY_COMMON_OPTIONS 

114 

115 

116# ============================================================================= 

117# Helper functions 

118# ============================================================================= 

119 

120 

121def nlprp_processor_dict( 

122 success: bool, 

123 processor: ServerProcessor = None, 

124 results: JsonArrayType = None, 

125 errcode: int = None, 

126 errmsg: str = None, 

127) -> JsonObjectType: 

128 """ 

129 Returns a dictionary suitable for use as one of the elements of the 

130 ``response["results"]["processors"]`` array; see :ref:`NLPRP <nlprp>`. 

131 

132 Args: 

133 success: 

134 did the request succeed? 

135 processor: 

136 a :class:`crate_anon.nlp_webserver.procs.Processor`, or ``None`` 

137 results: 

138 a JSON array (or dict) of results 

139 errcode: 

140 (if not ``success``) an integer error code 

141 errmsg: 

142 (if not ``success``) an error message 

143 

144 Returns: 

145 a JSON object in NLPRP format 

146 """ 

147 proc_dict = { 

148 NlprpKeys.RESULTS: results or EMPTY_PROCESSOR_RESULT, 

149 NlprpKeys.SUCCESS: success, 

150 } 

151 if processor: 

152 proc_dict[NlprpKeys.NAME] = processor.name 

153 proc_dict[NlprpKeys.TITLE] = processor.title 

154 proc_dict[NlprpKeys.VERSION] = processor.version 

155 if not success: 

156 proc_dict[NlprpKeys.ERRORS] = [ 

157 { 

158 NlprpKeys.CODE: errcode, 

159 NlprpKeys.MESSAGE: errmsg, 

160 NlprpKeys.DESCRIPTION: errmsg, 

161 } 

162 ] 

163 return proc_dict 

164 

165 

166def internal_error( 

167 msg: str, processor: ServerProcessor = None 

168) -> JsonObjectType: 

169 """ 

170 Log an error message, and raise a corresponding :exc:`NlprpError` for 

171 an internal server error. 

172 

173 Args: 

174 msg: the error message 

175 processor: the :class:`Processor` object to be used 

176 """ 

177 log.error(msg) 

178 return nlprp_processor_dict( 

179 success=False, 

180 processor=processor, 

181 errcode=HttpStatus.INTERNAL_SERVER_ERROR, 

182 errmsg=f"Internal Server Error: {msg}", 

183 ) 

184 

185 

186def gate_api_error( 

187 msg: str, processor: ServerProcessor = None 

188) -> JsonObjectType: 

189 """ 

190 Return a "GATE failed" error. 

191 

192 Args: 

193 msg: description of the error 

194 processor: the :class:`Processor` object to be used 

195 """ 

196 log.error(f"GATE API error: {msg}") 

197 return nlprp_processor_dict( 

198 success=False, 

199 processor=processor, 

200 errcode=HttpStatus.BAD_GATEWAY, 

201 errmsg=f"Bad Gateway: {msg}", 

202 ) 

203 

204 

205# ============================================================================= 

206# Convert GATE JSON results (from GATE's own API) to our internal format 

207# ============================================================================= 

208 

209 

210def get_gate_results(results_dict: JsonObjectType) -> JsonArrayType: 

211 """ 

212 Convert results in GATE JSON format to results in our internal format. 

213 

214 Args: 

215 results_dict: see :class:`crate_anon.nlp_manager.constants.GateApiKeys` 

216 or https://cloud.gate.ac.uk/info/help/online-api.html 

217 

218 Returns: 

219 list of dictionaries; see 

220 :class:`crate_anon.nlp_manager.constants.GateApiKeys` 

221 """ 

222 results = [] # type: JsonArrayType 

223 entities = results_dict[GateApiKeys.ENTITIES] 

224 for annottype, values in entities.items(): 

225 for features in values: 

226 start, end = features[GateApiKeys.INDICES] 

227 del features[GateApiKeys.INDICES] 

228 results.append( 

229 { 

230 GateResultKeys.TYPE: annottype, 

231 GateResultKeys.START: start, 

232 GateResultKeys.END: end, 

233 GateResultKeys.SET: None, # CHECK WHAT THIS SHOULD BE!! 

234 GateResultKeys.FEATURES: features, 

235 } 

236 ) 

237 return results 

238 

239 

240# ============================================================================= 

241# Task session management 

242# ============================================================================= 

243 

244 

245def start_task_session() -> None: 

246 """ 

247 Starts a session for the tasks. To be called at the start of a web request. 

248 """ 

249 TaskSession() 

250 

251 

252# ============================================================================= 

253# NLP server processing functions 

254# ============================================================================= 

255 

256 

257# noinspection PyUnusedLocal 

258@celery_app.task(bind=True, name="tasks.process_nlp_text") 

259def process_nlp_text( 

260 self, docprocrequest_id: str, username: str = "", crypt_pass: str = "" 

261) -> None: 

262 """ 

263 Task to process a single ``DocProcRequest`` by sending text to the relevant 

264 processor. 

265 

266 Args: 

267 self: 

268 the :class:`celery.Task` 

269 docprocrequest_id: 

270 the :class:`crate_anon.nlp_webserver.models.DocProcRequest` ID 

271 username: 

272 username in use 

273 crypt_pass: 

274 encrypted password 

275 """ 

276 

277 # noinspection PyUnresolvedReferences 

278 dpr = TaskSession.query(DocProcRequest).get( 

279 docprocrequest_id 

280 ) # type: Optional[DocProcRequest] 

281 if not dpr: 

282 log.error(f"DocProcRequest {docprocrequest_id} does not exist") 

283 if dpr.done: 

284 log.error(f"DocProcRequest {docprocrequest_id} already processed") 

285 return 

286 

287 # Turn the password back into bytes and decrypt 

288 password = decrypt_password(crypt_pass.encode(), CIPHER_SUITE) 

289 text = dpr.doctext 

290 

291 # Get the processor 

292 processor_id = dpr.processor_id 

293 

294 try: 

295 # Fetch the processor 

296 try: 

297 processor = ServerProcessor.processors[processor_id] # may raise 

298 # Run the NLP 

299 results = process_nlp_text_immediate( 

300 text, processor, username, password 

301 ) 

302 

303 except KeyError: 

304 results = internal_error(f"No such processor: {processor_id!r}") 

305 

306 dpr.done = True 

307 dpr.when_done_utc = datetime.datetime.utcnow() 

308 dpr.results = json.dumps(results, separators=JSON_SEPARATORS_COMPACT) 

309 transaction.commit() 

310 except SQLAlchemyError: 

311 # noinspection PyUnresolvedReferences 

312 TaskSession.rollback() 

313 

314 

315def process_nlp_text_immediate( 

316 text: str, 

317 processor: ServerProcessor, 

318 username: str = "", 

319 password: str = "", 

320) -> JsonObjectType: 

321 """ 

322 Function to send text immediately to the relevant processor. 

323 

324 Args: 

325 text: 

326 text to run the NLP over 

327 processor: 

328 NLP processor; a class:`crate_anon.nlp_webserver.procs.Processor` 

329 username: 

330 username in use 

331 password: 

332 plaintext password 

333 

334 Returns: 

335 a :class:`NlpServerResult` 

336 """ 

337 if processor.proctype == PROCTYPE_GATE: 

338 return process_nlp_gate(text, processor, username, password) 

339 else: 

340 if not processor.parser: 

341 processor.set_parser() 

342 return process_nlp_internal(text=text, processor=processor) 

343 

344 

345def process_nlp_gate( 

346 text: str, processor: ServerProcessor, username: str, password: str 

347) -> JsonObjectType: 

348 """ 

349 Send text to a chosen GATE processor (via an HTTP connection, using the 

350 GATE JSON API; see https://cloud.gate.ac.uk/info/help/online-api.html). 

351 

352 Args: 

353 text: 

354 text to run the NLP over 

355 processor: 

356 NLP processor; a class:`crate_anon.nlp_webserver.procs.Processor` 

357 username: 

358 username in use 

359 password: 

360 plaintext password 

361 

362 Returns: 

363 a :class:`NlpServerResult` 

364 

365 API failure is handled by returning a failure code/message to our client. 

366 """ 

367 headers = { 

368 "Content-Type": "text/plain", 

369 "Accept": "application/gate+json", 

370 # Content-Encoding: gzip?, 

371 "Expect": "100-continue", 

372 # ... see https://cloud.gate.ac.uk/info/help/online-api.html 

373 "charset": "utf8", 

374 } 

375 try: 

376 response = requests.post( 

377 processor.base_url + "/" + processor.name, 

378 data=text.encode("utf-8"), 

379 headers=headers, 

380 auth=(username, password), 

381 ) # basic auth 

382 except requests.exceptions.RequestException as e: 

383 return gate_api_error( 

384 f"The GATE processor returned the error: {e.response.reason} " 

385 f"(with status code {e.response.status_code})", 

386 processor=processor, 

387 ) 

388 if response.status_code != HttpStatus.OK: 

389 return gate_api_error( 

390 f"The GATE processor returned the error: {response.reason} " 

391 f"(with status code {response.status_code})", 

392 processor=processor, 

393 ) 

394 try: 

395 json_response = response.json() 

396 except json.decoder.JSONDecodeError: 

397 return gate_api_error( 

398 "Bad Gateway: The GATE processor did not return JSON", 

399 processor=processor, 

400 ) 

401 results = get_gate_results(json_response) 

402 return nlprp_processor_dict( 

403 success=True, processor=processor, results=results 

404 ) 

405 

406 

407def process_nlp_internal( 

408 text: str, processor: ServerProcessor 

409) -> JsonObjectType: 

410 """ 

411 Send text to a chosen CRATE Python NLP processor and return a 

412 :class:`NlpServerResult`. 

413 

414 Args: 

415 text: 

416 The text to process. 

417 processor: 

418 The NLP processor to use. 

419 """ 

420 parser = processor.parser 

421 try: 

422 tablename_valuedict_generator = parser.parse(text) 

423 except AttributeError: 

424 return internal_error( 

425 f"parser is not a CRATE Python NLP parser; is {parser!r}" 

426 ) 

427 if USE_DICT_FORMAT_NLPRP_RESULT: 

428 results = defaultdict(list) 

429 for tablename, tableresult in tablename_valuedict_generator: 

430 results[tablename].append(tableresult) 

431 # If we use defaultdict(list) here, unmodified, the default JSON 

432 # serialiser messes up and products things like {'results': 

433 # defaultdict(<class 'list'>, {'alcoholunits': [{'variable_name': ... 

434 # The code above is probably quicker as an iterator, but we can convert 

435 # here: 

436 results = dict(results) 

437 else: 

438 # Get second element of each element in parsed text as first is 

439 # tablename which will have no meaning here 

440 results = [] 

441 tables_seen = set() 

442 for tablename, tableresult in tablename_valuedict_generator: 

443 results.append(tableresult) 

444 tables_seen.add(tablename) 

445 assert len(tables_seen) < 2, ( 

446 "Internal bug: can't return results from multiple tables (within " 

447 "a single NLP processor) in single-table list format" 

448 ) 

449 return nlprp_processor_dict(True, processor, results=results)