Coverage for nlp_webserver/tasks.py: 58%
129 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1r"""
2crate_anon/nlp_webserver/tasks.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26Tasks to process text for an NLPRP server (and be scheduled via Celery).
28"""
30from collections import defaultdict
31import datetime
32import json
33import logging
34from typing import TYPE_CHECKING
36from cardinal_pythonlib.httpconst import HttpStatus
37from cardinal_pythonlib.json_utils.typing_helpers import (
38 JsonArrayType,
39 JsonObjectType,
40)
41from celery import Celery
43# from celery.app.task import Task # see "def delay", "def apply_async"
44from cryptography.fernet import Fernet
45import requests
46from sqlalchemy import engine_from_config
47from sqlalchemy.orm import scoped_session
48from sqlalchemy.exc import SQLAlchemyError
49import transaction
51from crate_anon.common.constants import JSON_SEPARATORS_COMPACT
52from crate_anon.nlp_manager.constants import GateApiKeys, GateResultKeys
53from crate_anon.nlp_webserver.models import Session, DocProcRequest
54from crate_anon.nlp_webserver.server_processor import ServerProcessor
55from crate_anon.nlp_webserver.constants import (
56 NlpServerConfigKeys,
57 PROCTYPE_GATE,
58 SQLALCHEMY_COMMON_OPTIONS,
59)
60from crate_anon.nlp_webserver.security import decrypt_password
61from crate_anon.nlp_webserver.settings import SETTINGS
62from crate_anon.nlprp.api import NlprpKeys
64if TYPE_CHECKING:
65 from typing import Optional
67log = logging.getLogger(__name__)
70# =============================================================================
71# Constants
72# =============================================================================
74# Use the more explicit NLPRP dictionary (JSON object) result format, rather
75# than the simpler list (JSON array) format?
76USE_DICT_FORMAT_NLPRP_RESULT = True
78# Following on from that:
79EMPTY_PROCESSOR_RESULT = {} if USE_DICT_FORMAT_NLPRP_RESULT else []
82# =============================================================================
83# SQLAlchemy and Celery setup
84# =============================================================================
86TaskSession = scoped_session(Session)
87engine = engine_from_config(
88 SETTINGS,
89 NlpServerConfigKeys.SQLALCHEMY_PREFIX,
90 **SQLALCHEMY_COMMON_OPTIONS,
91)
92TaskSession.configure(bind=engine)
94try:
95 broker_url = SETTINGS[NlpServerConfigKeys.BROKER_URL]
96except KeyError:
97 log.error(
98 f"{NlpServerConfigKeys.BROKER_URL} value " f"missing from config file."
99 )
100 raise
101backend_url = SETTINGS.get(NlpServerConfigKeys.BACKEND_URL) or None
103key = SETTINGS[NlpServerConfigKeys.ENCRYPTION_KEY]
104# Turn key into bytes object
105key = key.encode()
106CIPHER_SUITE = Fernet(key)
108# Set expiry time to 90 days in seconds
109expiry_time = 60 * 60 * 24 * 90
110celery_app = Celery(
111 "tasks", broker=broker_url, backend=backend_url, result_expires=expiry_time
112)
113celery_app.conf.database_engine_options = SQLALCHEMY_COMMON_OPTIONS
116# =============================================================================
117# Helper functions
118# =============================================================================
121def nlprp_processor_dict(
122 success: bool,
123 processor: ServerProcessor = None,
124 results: JsonArrayType = None,
125 errcode: int = None,
126 errmsg: str = None,
127) -> JsonObjectType:
128 """
129 Returns a dictionary suitable for use as one of the elements of the
130 ``response["results"]["processors"]`` array; see :ref:`NLPRP <nlprp>`.
132 Args:
133 success:
134 did the request succeed?
135 processor:
136 a :class:`crate_anon.nlp_webserver.procs.Processor`, or ``None``
137 results:
138 a JSON array (or dict) of results
139 errcode:
140 (if not ``success``) an integer error code
141 errmsg:
142 (if not ``success``) an error message
144 Returns:
145 a JSON object in NLPRP format
146 """
147 proc_dict = {
148 NlprpKeys.RESULTS: results or EMPTY_PROCESSOR_RESULT,
149 NlprpKeys.SUCCESS: success,
150 }
151 if processor:
152 proc_dict[NlprpKeys.NAME] = processor.name
153 proc_dict[NlprpKeys.TITLE] = processor.title
154 proc_dict[NlprpKeys.VERSION] = processor.version
155 if not success:
156 proc_dict[NlprpKeys.ERRORS] = [
157 {
158 NlprpKeys.CODE: errcode,
159 NlprpKeys.MESSAGE: errmsg,
160 NlprpKeys.DESCRIPTION: errmsg,
161 }
162 ]
163 return proc_dict
166def internal_error(
167 msg: str, processor: ServerProcessor = None
168) -> JsonObjectType:
169 """
170 Log an error message, and raise a corresponding :exc:`NlprpError` for
171 an internal server error.
173 Args:
174 msg: the error message
175 processor: the :class:`Processor` object to be used
176 """
177 log.error(msg)
178 return nlprp_processor_dict(
179 success=False,
180 processor=processor,
181 errcode=HttpStatus.INTERNAL_SERVER_ERROR,
182 errmsg=f"Internal Server Error: {msg}",
183 )
186def gate_api_error(
187 msg: str, processor: ServerProcessor = None
188) -> JsonObjectType:
189 """
190 Return a "GATE failed" error.
192 Args:
193 msg: description of the error
194 processor: the :class:`Processor` object to be used
195 """
196 log.error(f"GATE API error: {msg}")
197 return nlprp_processor_dict(
198 success=False,
199 processor=processor,
200 errcode=HttpStatus.BAD_GATEWAY,
201 errmsg=f"Bad Gateway: {msg}",
202 )
205# =============================================================================
206# Convert GATE JSON results (from GATE's own API) to our internal format
207# =============================================================================
210def get_gate_results(results_dict: JsonObjectType) -> JsonArrayType:
211 """
212 Convert results in GATE JSON format to results in our internal format.
214 Args:
215 results_dict: see :class:`crate_anon.nlp_manager.constants.GateApiKeys`
216 or https://cloud.gate.ac.uk/info/help/online-api.html
218 Returns:
219 list of dictionaries; see
220 :class:`crate_anon.nlp_manager.constants.GateApiKeys`
221 """
222 results = [] # type: JsonArrayType
223 entities = results_dict[GateApiKeys.ENTITIES]
224 for annottype, values in entities.items():
225 for features in values:
226 start, end = features[GateApiKeys.INDICES]
227 del features[GateApiKeys.INDICES]
228 results.append(
229 {
230 GateResultKeys.TYPE: annottype,
231 GateResultKeys.START: start,
232 GateResultKeys.END: end,
233 GateResultKeys.SET: None, # CHECK WHAT THIS SHOULD BE!!
234 GateResultKeys.FEATURES: features,
235 }
236 )
237 return results
240# =============================================================================
241# Task session management
242# =============================================================================
245def start_task_session() -> None:
246 """
247 Starts a session for the tasks. To be called at the start of a web request.
248 """
249 TaskSession()
252# =============================================================================
253# NLP server processing functions
254# =============================================================================
257# noinspection PyUnusedLocal
258@celery_app.task(bind=True, name="tasks.process_nlp_text")
259def process_nlp_text(
260 self, docprocrequest_id: str, username: str = "", crypt_pass: str = ""
261) -> None:
262 """
263 Task to process a single ``DocProcRequest`` by sending text to the relevant
264 processor.
266 Args:
267 self:
268 the :class:`celery.Task`
269 docprocrequest_id:
270 the :class:`crate_anon.nlp_webserver.models.DocProcRequest` ID
271 username:
272 username in use
273 crypt_pass:
274 encrypted password
275 """
277 # noinspection PyUnresolvedReferences
278 dpr = TaskSession.query(DocProcRequest).get(
279 docprocrequest_id
280 ) # type: Optional[DocProcRequest]
281 if not dpr:
282 log.error(f"DocProcRequest {docprocrequest_id} does not exist")
283 if dpr.done:
284 log.error(f"DocProcRequest {docprocrequest_id} already processed")
285 return
287 # Turn the password back into bytes and decrypt
288 password = decrypt_password(crypt_pass.encode(), CIPHER_SUITE)
289 text = dpr.doctext
291 # Get the processor
292 processor_id = dpr.processor_id
294 try:
295 # Fetch the processor
296 try:
297 processor = ServerProcessor.processors[processor_id] # may raise
298 # Run the NLP
299 results = process_nlp_text_immediate(
300 text, processor, username, password
301 )
303 except KeyError:
304 results = internal_error(f"No such processor: {processor_id!r}")
306 dpr.done = True
307 dpr.when_done_utc = datetime.datetime.utcnow()
308 dpr.results = json.dumps(results, separators=JSON_SEPARATORS_COMPACT)
309 transaction.commit()
310 except SQLAlchemyError:
311 # noinspection PyUnresolvedReferences
312 TaskSession.rollback()
315def process_nlp_text_immediate(
316 text: str,
317 processor: ServerProcessor,
318 username: str = "",
319 password: str = "",
320) -> JsonObjectType:
321 """
322 Function to send text immediately to the relevant processor.
324 Args:
325 text:
326 text to run the NLP over
327 processor:
328 NLP processor; a class:`crate_anon.nlp_webserver.procs.Processor`
329 username:
330 username in use
331 password:
332 plaintext password
334 Returns:
335 a :class:`NlpServerResult`
336 """
337 if processor.proctype == PROCTYPE_GATE:
338 return process_nlp_gate(text, processor, username, password)
339 else:
340 if not processor.parser:
341 processor.set_parser()
342 return process_nlp_internal(text=text, processor=processor)
345def process_nlp_gate(
346 text: str, processor: ServerProcessor, username: str, password: str
347) -> JsonObjectType:
348 """
349 Send text to a chosen GATE processor (via an HTTP connection, using the
350 GATE JSON API; see https://cloud.gate.ac.uk/info/help/online-api.html).
352 Args:
353 text:
354 text to run the NLP over
355 processor:
356 NLP processor; a class:`crate_anon.nlp_webserver.procs.Processor`
357 username:
358 username in use
359 password:
360 plaintext password
362 Returns:
363 a :class:`NlpServerResult`
365 API failure is handled by returning a failure code/message to our client.
366 """
367 headers = {
368 "Content-Type": "text/plain",
369 "Accept": "application/gate+json",
370 # Content-Encoding: gzip?,
371 "Expect": "100-continue",
372 # ... see https://cloud.gate.ac.uk/info/help/online-api.html
373 "charset": "utf8",
374 }
375 try:
376 response = requests.post(
377 processor.base_url + "/" + processor.name,
378 data=text.encode("utf-8"),
379 headers=headers,
380 auth=(username, password),
381 ) # basic auth
382 except requests.exceptions.RequestException as e:
383 return gate_api_error(
384 f"The GATE processor returned the error: {e.response.reason} "
385 f"(with status code {e.response.status_code})",
386 processor=processor,
387 )
388 if response.status_code != HttpStatus.OK:
389 return gate_api_error(
390 f"The GATE processor returned the error: {response.reason} "
391 f"(with status code {response.status_code})",
392 processor=processor,
393 )
394 try:
395 json_response = response.json()
396 except json.decoder.JSONDecodeError:
397 return gate_api_error(
398 "Bad Gateway: The GATE processor did not return JSON",
399 processor=processor,
400 )
401 results = get_gate_results(json_response)
402 return nlprp_processor_dict(
403 success=True, processor=processor, results=results
404 )
407def process_nlp_internal(
408 text: str, processor: ServerProcessor
409) -> JsonObjectType:
410 """
411 Send text to a chosen CRATE Python NLP processor and return a
412 :class:`NlpServerResult`.
414 Args:
415 text:
416 The text to process.
417 processor:
418 The NLP processor to use.
419 """
420 parser = processor.parser
421 try:
422 tablename_valuedict_generator = parser.parse(text)
423 except AttributeError:
424 return internal_error(
425 f"parser is not a CRATE Python NLP parser; is {parser!r}"
426 )
427 if USE_DICT_FORMAT_NLPRP_RESULT:
428 results = defaultdict(list)
429 for tablename, tableresult in tablename_valuedict_generator:
430 results[tablename].append(tableresult)
431 # If we use defaultdict(list) here, unmodified, the default JSON
432 # serialiser messes up and products things like {'results':
433 # defaultdict(<class 'list'>, {'alcoholunits': [{'variable_name': ...
434 # The code above is probably quicker as an iterator, but we can convert
435 # here:
436 results = dict(results)
437 else:
438 # Get second element of each element in parsed text as first is
439 # tablename which will have no meaning here
440 results = []
441 tables_seen = set()
442 for tablename, tableresult in tablename_valuedict_generator:
443 results.append(tableresult)
444 tables_seen.add(tablename)
445 assert len(tables_seen) < 2, (
446 "Internal bug: can't return results from multiple tables (within "
447 "a single NLP processor) in single-table list format"
448 )
449 return nlprp_processor_dict(True, processor, results=results)