Coverage for nlp_manager/cloud_request.py: 63%
453 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1"""
2crate_anon/nlp_manager/cloud_request.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26This module is for sending JSON requests to the NLP Cloud server and
27receiving responses.
29"""
31from copy import copy
32from http.cookiejar import CookieJar
33import json
34import logging
35import sys
36from typing import Any, Dict, List, Tuple, Generator, Optional, TYPE_CHECKING
37import time
38from sqlalchemy.exc import DatabaseError
40from cardinal_pythonlib.compression import gzip_string
41from cardinal_pythonlib.rate_limiting import rate_limited
42from cardinal_pythonlib.json_utils.typing_helpers import (
43 JsonArrayType,
44 JsonObjectType,
45 JsonValueType,
46)
47from cardinal_pythonlib.dicts import (
48 rename_keys_in_dict,
49 set_null_values_in_dict,
50)
51from cardinal_pythonlib.httpconst import HttpStatus
52from cardinal_pythonlib.timing import MultiTimerContext, timer
53from requests import post, Response
54from requests.exceptions import HTTPError, RequestException
55from semantic_version import Version
56from urllib3.exceptions import NewConnectionError
58from crate_anon.common.constants import (
59 JSON_INDENT,
60 JSON_SEPARATORS_COMPACT,
61 NoneType,
62)
63from crate_anon.common.memsize import getsize
64from crate_anon.common.stringfunc import does_text_contain_word_chars
65from crate_anon.nlp_manager.cloud_parser import Cloud
66from crate_anon.nlp_manager.constants import (
67 FN_NLPDEF,
68 FN_SRCPKSTR,
69 FN_SRCPKVAL,
70 FN_WHEN_FETCHED,
71 GateFieldNames,
72 GateResultKeys,
73 NlpConfigPrefixes,
74 NlpDefValues,
75 full_sectionname,
76)
77from crate_anon.nlp_manager.models import FN_SRCHASH
78from crate_anon.nlp_manager.nlp_definition import NlpDefinition
80from crate_anon.nlprp.api import (
81 json_get_array,
82 json_get_int,
83 make_nlprp_dict,
84 make_nlprp_request,
85 nlprp_datetime_to_datetime_utc_no_tzinfo,
86)
87from crate_anon.nlprp.constants import (
88 NlprpCommands,
89 NlprpKeys,
90 NlprpValues,
91 NlprpVersions,
92)
93from crate_anon.nlp_webserver.server_processor import ServerProcessor
95if TYPE_CHECKING:
96 from crate_anon.nlp_manager.cloud_run_info import CloudRunInfo
99log = logging.getLogger(__name__)
101TIMING_INSERT = "CloudRequest_sql_insert"
104# =============================================================================
105# Helper functions
106# =============================================================================
109def utf8len(text: str) -> int:
110 """
111 Returns the length of text once encoded in UTF-8.
112 """
113 return len(text.encode("utf-8"))
116def to_json_str(json_structure: JsonValueType) -> str:
117 """
118 Converts a Python object to a JSON string.
119 """
120 return json.dumps(
121 json_structure, default=str, separators=JSON_SEPARATORS_COMPACT
122 )
123 # This needs 'default=str' to deal with non-JSON-serializable
124 # objects that may occur, such as datetimes in the metadata.
127def report_processor_errors(processor_data: Dict[str, Any]) -> None:
128 """
129 Should only be called if there has been an error. Reports the error(s) to
130 the log.
131 """
132 name = processor_data[NlprpKeys.NAME]
133 version = processor_data[NlprpKeys.VERSION]
134 error_messages = "\n".join(
135 f"{error[NlprpKeys.CODE]} - {error[NlprpKeys.MESSAGE]}: "
136 f"{error[NlprpKeys.DESCRIPTION]}"
137 for error in processor_data[NlprpKeys.ERRORS]
138 )
139 log.error(
140 f"Processor {name!r} (version {version}) failed for this "
141 f"document. Errors:\n{error_messages}"
142 )
145def extract_nlprp_top_level_results(nlp_data: JsonObjectType) -> List:
146 """
147 Checks that the top-level NLP response contains an appropriate "results"
148 object, or raises KeyError or ValueError.
150 Returns the list result, which is a list of results per document.
151 """
152 try:
153 docresultlist = nlp_data[NlprpKeys.RESULTS]
154 except KeyError:
155 raise KeyError(
156 "Top-level response does not contain key "
157 f"{NlprpKeys.RESULTS!r}: {nlp_data!r}"
158 )
159 if not isinstance(docresultlist, list):
160 raise ValueError(
161 f"{NlprpKeys.RESULTS!r} value is not a list: {docresultlist!r}"
162 )
163 return docresultlist
166def parse_nlprp_docresult_metadata(
167 docresult: JsonObjectType,
168) -> Tuple[Dict[str, Any], Optional[int], Optional[str], str]:
169 """
170 Check that this NLPRP document result validly contains metadata, and that
171 metadata contains things we always send. Extract key components. Provide
172 helpful error message on failure.
174 Returns:
175 tuple (metadata, pkval, pkstr, srchhash)
177 """
178 try:
179 metadata = docresult[NlprpKeys.METADATA]
180 except KeyError:
181 raise KeyError(
182 "Document result does not contain key "
183 f"{NlprpKeys.METADATA!r}: {docresult!r}"
184 )
185 if not isinstance(metadata, dict):
186 # ... expected type because that's what we sent; see add_text()
187 raise KeyError(f"Document result metadata is not a dict: {metadata!r}")
189 try:
190 pkval = metadata[FN_SRCPKVAL]
191 except KeyError:
192 raise KeyError(
193 "Document metadata does not contain key "
194 f"{FN_SRCPKVAL!r}: {metadata!r}"
195 )
196 if not isinstance(pkval, (int, NoneType)):
197 # ... expected type because that's what we sent; see add_text()
198 raise KeyError(
199 f"Document result metadata {FN_SRCPKVAL!r} is not null or int: "
200 f"{pkval!r}"
201 )
203 try:
204 pkstr = metadata[FN_SRCPKSTR]
205 except KeyError:
206 raise KeyError(
207 "Document metadata does not contain key "
208 f"{FN_SRCPKSTR!r}: {metadata!r}"
209 )
210 if not isinstance(pkstr, (str, NoneType)):
211 raise KeyError(
212 f"Document result metadata {FN_SRCPKVAL!r} is not null or str: "
213 f"{pkstr!r}"
214 )
216 if pkval is None and pkstr is None:
217 raise ValueError(
218 f"In document result, both {FN_SRCPKVAL!r} and "
219 f"{FN_SRCPKSTR!r} are null"
220 )
222 try:
223 srchash = metadata[FN_SRCHASH]
224 except KeyError:
225 raise KeyError(
226 "Document metadata does not contain key "
227 f"{FN_SRCPKSTR!r}: {metadata!r}"
228 )
229 if not isinstance(srchash, str):
230 raise KeyError(
231 f"Document result metadata {FN_SRCPKSTR!r} is not str: "
232 f"{srchash!r}"
233 )
235 return metadata, pkval, pkstr, srchash
238def extract_processor_data_list(
239 docresult: JsonObjectType,
240) -> List[JsonObjectType]:
241 """
242 Check and extract a list of per-processor results from a single-document
243 NLPRP result.
244 """
245 try:
246 processor_data_list = docresult[NlprpKeys.PROCESSORS]
247 except KeyError:
248 raise KeyError(
249 "Document result does not contain key "
250 f"{NlprpKeys.PROCESSORS!r}: {docresult!r}"
251 )
252 if not isinstance(processor_data_list, list):
253 raise ValueError(
254 f"Document result's {NlprpKeys.PROCESSORS!r} element is not a "
255 f"list: {processor_data_list!r}"
256 )
257 return processor_data_list
260def parse_per_processor_data(processor_data: Dict[str, Any]) -> Tuple:
261 """
262 Return a tuple of mandatory results from NLPRP per-processor data, or raise
263 KeyError.
264 """
265 if not isinstance(processor_data, dict):
266 raise ValueError(f"Processor result is not a dict: {processor_data!r}")
268 try:
269 name = processor_data[NlprpKeys.NAME]
270 except KeyError:
271 raise KeyError(
272 "Processor result does not contain key "
273 f"{NlprpKeys.NAME!r}: {processor_data!r}"
274 )
276 try:
277 version = processor_data[NlprpKeys.VERSION]
278 except KeyError:
279 raise KeyError(
280 "Processor result does not contain key "
281 f"{NlprpKeys.VERSION!r}: {processor_data!r}"
282 )
284 is_default_version = processor_data.get(NlprpKeys.IS_DEFAULT_VERSION, True)
286 try:
287 success = processor_data[NlprpKeys.SUCCESS]
288 except KeyError:
289 raise KeyError(
290 "Processor result does not contain key "
291 f"{NlprpKeys.SUCCESS!r}: {processor_data!r}"
292 )
294 try:
295 processor_results = processor_data[NlprpKeys.RESULTS]
296 except KeyError:
297 raise KeyError(
298 "Processor result does not contain key "
299 f"{NlprpKeys.RESULTS!r}: {processor_data!r}"
300 )
302 return name, version, is_default_version, success, processor_results
305# =============================================================================
306# Exceptions
307# =============================================================================
310class RecordNotPrintable(Exception):
311 pass
314class RecordsPerRequestExceeded(Exception):
315 pass
318class RequestTooLong(Exception):
319 pass
322# =============================================================================
323# CloudRequest
324# =============================================================================
327class CloudRequest:
328 """
329 Class to send requests to the cloud processors and process the results.
330 """
332 # Set up standard information for all requests
333 HTTP_HEADERS = {"charset": "utf-8", "Content-Type": "application/json"}
335 # -------------------------------------------------------------------------
336 # Initialization
337 # -------------------------------------------------------------------------
339 def __init__(
340 self,
341 nlpdef: NlpDefinition,
342 debug_post_request: bool = False,
343 debug_post_response: bool = False,
344 ) -> None:
345 """
346 Args:
347 nlpdef:
348 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
349 """
350 self._nlpdef = nlpdef
351 self._cloudcfg = nlpdef.get_cloud_config_or_raise()
352 self._nlpdef_sectionname = full_sectionname(
353 NlpConfigPrefixes.NLPDEF, self._nlpdef.name
354 )
355 self._auth = (self._cloudcfg.username, self._cloudcfg.password)
356 self._post = self._internal_post
358 self.cookies = None # type: Optional[CookieJar]
359 self._debug_post_request = debug_post_request
360 self._debug_post_response = debug_post_response
362 # -------------------------------------------------------------------------
363 # HTTP
364 # -------------------------------------------------------------------------
366 @classmethod
367 def set_rate_limit(cls, rate_limit_hz: int) -> None:
368 """
369 Creates new methods which are rate limited. Only use this once per run.
371 Note that this is a classmethod and must be so; if it were
372 instance-based, you could create multiple requests and each would
373 individually be rate-limited, but not collectively.
374 """
375 if rate_limit_hz > 0:
376 # Rate-limited
377 cls._post = rate_limited(rate_limit_hz)(cls._internal_post)
378 else:
379 # No limits!
380 cls._post = cls._internal_post
382 def _internal_post(
383 self, request_json: str, may_fail: bool = None
384 ) -> Optional[Response]:
385 """
386 Submits an HTTP POST request to the remote.
387 Tries up to a certain number of times.
389 Notes:
391 - The Python ``requests`` library automatically applies
392 ``Accept-Encoding: gzip, deflate`` to outbound HTTP requests, and
393 automatically gzip-decodes responses.
394 - However, we have to do outbound compression manually.
396 Args:
397 request_json: JSON (string) request.
398 may_fail: may the request fail? Boolean, or ``None`` to use the
399 value from the cloud NLP config
401 Returns:
402 :class:`requests.Response`, or ``None`` for failure (if failure is
403 permitted by ``may_fail``).
405 Raises:
406 - :exc:`RequestException` if max retries exceeded and we are
407 stopping on failure
408 """
409 if may_fail is None:
410 may_fail = not self._cloudcfg.stop_at_failure
411 tries = 0
412 success = False
413 response = None
414 if self._cloudcfg.compress:
415 headers = self.HTTP_HEADERS.copy()
416 headers["Content-Encoding"] = "gzip"
417 data = gzip_string(request_json)
418 else:
419 headers = self.HTTP_HEADERS
420 data = request_json
421 while (not success) and tries <= self._cloudcfg.max_tries:
422 try:
423 tries += 1
424 if self._debug_post_request:
425 formatted_request = json.dumps(
426 json.loads(request_json), indent=JSON_INDENT
427 )
428 log.debug(
429 f"Sending to {self._cloudcfg.url} :\n"
430 f"{formatted_request}"
431 )
432 response = post(
433 url=self._cloudcfg.url,
434 data=data,
435 auth=self._auth,
436 headers=headers,
437 cookies=self.cookies,
438 verify=self._cloudcfg.verify_ssl,
439 )
440 if self._debug_post_response:
441 try:
442 formatted_response = json.dumps(
443 response.json(), indent=JSON_INDENT
444 )
445 except (AttributeError, json.decoder.JSONDecodeError):
446 formatted_response = ""
447 log.debug(
448 f"Received from {self._cloudcfg.url} :\n"
449 f"{response}\n"
450 f"{formatted_response}"
451 )
452 self.cookies = response.cookies
453 success = True
454 except (RequestException, NewConnectionError) as e:
455 self._sleep_for_remote(e)
456 if not success:
457 # Failure
458 msg = "Max tries exceeded. Request has failed."
459 log.error(msg)
460 if may_fail:
461 self.request_failed = True
462 return None
463 else:
464 raise RequestException(msg)
465 # Success
466 return response
468 def _post_get_json(
469 self, request_json: str, may_fail: bool = False
470 ) -> Optional[JsonObjectType]:
471 """
472 Executes :meth:`_post`, then parses the result as JSON.
474 Args:
475 request_json: JSON (string) request.
476 may_fail: may the request fail?
478 Returns:
479 dict: JSON object, or ``None`` upon failure if ``may_fail`` is
480 ``True``
482 Raises:
483 - :exc:`RequestException` if max retries exceeded and we are
484 stopping on failure
485 - :exc:`JSONDecodeError` for bad JSON
486 """
487 response = self._post(request_json, may_fail=may_fail)
488 if response is None and may_fail:
489 return None
490 try:
491 # noinspection PyUnboundLocalVariable
492 json_response = response.json()
493 assert isinstance(json_response, dict)
494 return json_response
495 except json.decoder.JSONDecodeError:
496 log.error("Reply was not JSON")
497 raise
498 except AssertionError:
499 log.error("Reply was JSON but not a JSON object (dict)")
500 raise
502 def _sleep_for_remote(self, exc: Exception) -> None:
503 """
504 Wait for a while, because the remote is unhappy for some reason.
506 Args:
507 exc: exception that caused us to wait.
508 """
509 log.error(exc)
510 time_s = self._cloudcfg.wait_on_conn_err
511 log.warning(f"Retrying in {time_s} seconds.")
512 time.sleep(time_s)
515# =============================================================================
516# CloudRequestListProcessors
517# =============================================================================
520class CloudRequestListProcessors(CloudRequest):
521 """
522 Request to get processors from the remote.
523 """
525 def __init__(self, nlpdef: NlpDefinition, **kwargs) -> None:
526 """
527 Args:
528 nlpdef:
529 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
530 """
531 super().__init__(nlpdef=nlpdef, **kwargs)
533 def get_remote_processors(self) -> List[ServerProcessor]:
534 """
535 Returns the list of available processors from the remote. If that list
536 has not already been fetched, or unless it was pre-specified upon
537 construction, fetch it from the server.
538 """
539 # Make request
540 list_procs_request = make_nlprp_dict()
541 list_procs_request[NlprpKeys.COMMAND] = NlprpCommands.LIST_PROCESSORS
542 request_json = to_json_str(list_procs_request)
544 # Send request, get response
545 json_response = self._post_get_json(request_json, may_fail=False)
547 status = json_get_int(json_response, NlprpKeys.STATUS)
548 if not HttpStatus.is_good_answer(status):
549 errors = json_get_array(json_response, NlprpKeys.ERRORS)
550 for err in errors:
551 log.error(f"Error received: {err!r}")
552 raise HTTPError(f"Response status was: {status}")
554 processors = [] # type: List[ServerProcessor]
555 try:
556 proclist = json_response[
557 NlprpKeys.PROCESSORS
558 ] # type: JsonArrayType
559 except KeyError:
560 raise KeyError(
561 f"Server did not provide key {NlprpKeys.PROCESSORS!r} in its "
562 f"response: {json_response!r}"
563 )
564 if not isinstance(proclist, list):
565 raise ValueError(
566 f"Server's value of {NlprpKeys.PROCESSORS!r} is not a list: "
567 f"{proclist!r}"
568 )
569 for procinfo in proclist:
570 if not isinstance(procinfo, dict):
571 raise ValueError(
572 f"Server's procinfo object not a dict: {procinfo!r}"
573 )
574 # Any of the following may raise KeyError if missing:
575 try:
576 proc = ServerProcessor(
577 # Mandatory:
578 name=procinfo[NlprpKeys.NAME],
579 title=procinfo[NlprpKeys.TITLE],
580 version=procinfo[NlprpKeys.VERSION],
581 is_default_version=procinfo.get(
582 NlprpKeys.IS_DEFAULT_VERSION, True
583 ),
584 description=procinfo[NlprpKeys.DESCRIPTION],
585 # Optional:
586 schema_type=procinfo.get(
587 NlprpKeys.SCHEMA_TYPE, NlprpValues.UNKNOWN
588 ),
589 sql_dialect=procinfo.get(NlprpKeys.SQL_DIALECT, ""),
590 tabular_schema=procinfo.get(NlprpKeys.TABULAR_SCHEMA),
591 )
592 except KeyError:
593 log.critical(
594 "NLPRP server's processor information is missing a "
595 "required field"
596 )
597 raise
598 processors.append(proc)
599 return processors
602# =============================================================================
603# CloudRequestProcess
604# =============================================================================
607class CloudRequestProcess(CloudRequest):
608 """
609 Request to process text.
610 """
612 def __init__(
613 self,
614 crinfo: "CloudRunInfo" = None,
615 nlpdef: NlpDefinition = None,
616 commit: bool = False,
617 client_job_id: str = None,
618 **kwargs,
619 ) -> None:
620 """
621 Args:
622 crinfo:
623 a :class:`crate_anon.nlp_manager.cloud_run_info.CloudRunInfo`
624 nlpdef:
625 a :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
626 commit:
627 force a COMMIT whenever we insert data? You should specify this
628 in multiprocess mode, or you may get database deadlocks.
629 client_job_id:
630 optional string used to group together results into one job.
631 """
632 assert nlpdef or crinfo
633 if nlpdef is None:
634 nlpdef = crinfo.nlpdef
635 super().__init__(nlpdef=nlpdef, **kwargs)
636 self._crinfo = crinfo
637 self._commit = commit
638 self._fetched = False
639 self._client_job_id = client_job_id or ""
640 # How many records have been added to this particular request?
641 self.number_of_records = 0
643 # Set up processing request
644 self._request_process = make_nlprp_dict()
645 self._request_process[NlprpKeys.COMMAND] = NlprpCommands.PROCESS
646 self._request_process[NlprpKeys.ARGS] = {
647 NlprpKeys.PROCESSORS: [], # type: List[str]
648 NlprpKeys.QUEUE: True,
649 NlprpKeys.CLIENT_JOB_ID: self._client_job_id,
650 NlprpKeys.INCLUDE_TEXT: False,
651 NlprpKeys.CONTENT: [], # type: List[str]
652 }
653 # Set up fetch_from_queue request
654 self._fetch_request = make_nlprp_dict()
655 self._fetch_request[NlprpKeys.COMMAND] = NlprpCommands.FETCH_FROM_QUEUE
657 self.nlp_data = None # type: Optional[JsonObjectType]
658 # ... the JSON response
659 self.queue_id = None # type: Optional[str]
661 self.request_failed = False
663 # Of the form:
664 # {(procname, version): 'Cloud' object}
665 self.requested_processors = (
666 self._cloudcfg.remote_processors
667 ) # type: Dict[Tuple[str, Optional[str]], Cloud]
669 if crinfo:
670 self._add_all_processors_to_request() # may raise
672 # -------------------------------------------------------------------------
673 # Sending text to the server
674 # -------------------------------------------------------------------------
676 def _process_request_too_long(self, max_length: Optional[int]) -> bool:
677 """
678 Is the number of bytes in the outbound JSON request more than the
679 permitted maximum?
681 Args:
682 max_length:
683 the maximum length; 0 or ``None`` for no limit
685 Notes:
687 The JSON method was found to be slow.
689 Specimen methods:
691 .. code-block:: python
693 import timeit
694 setup = '''
695 import json
696 from crate_anon.common.constants import JSON_SEPARATORS_COMPACT
697 from crate_anon.common.memsize import getsize
698 stringlength = 100000 # 100 kb
699 testdict = {'a': 1, 'b': {'c': [2,3,4, 'x' * stringlength]}}
700 '''
701 v1 = "len(json.dumps(testdict, separators=JSON_SEPARATORS_COMPACT).encode('utf-8'))"
702 timeit.timeit(v1, setup=setup, number=1000)
703 # ... answer is total time in s, and therefore per-call time in milliseconds
704 # v1 gives e.g. 0.39ms
706 v2 = "getsize(testdict)"
707 timeit.timeit(v2, setup=setup, number=1000)
708 # v2 gives 0.006 ms
710 In general, long strings (which is the thing we're watching out for)
711 make :func:`json.dumps` particularly slow.
713 But also, in general, Python objects seem to take up more space than
714 their JSON representation; e.g. compare
716 .. code-block:: python
718 import json
719 from crate_anon.common.constants import JSON_SEPARATORS_COMPACT
720 from crate_anon.common.memsize import getsize
721 from typing import Any
723 def compare(x: Any) -> None:
724 json_utf8_length = len(
725 json.dumps(x, separators=JSON_SEPARATORS_COMPACT).encode('utf-8')
726 )
727 python_length = getsize(x)
728 print(f"{x!r} -> JSON-UTF8 {json_utf8_length}, Python {python_length}")
731 compare("a") # JSON-UTF8 3, Python 50
732 compare(1) # JSON-UTF8 1, Python 28
733 compare({"a": 1, "b": [2, 3, "xyz"]}) # JSON-UTF8 23, Python 464
735 It can be quite a big overestimate, so we probably shouldn't chuck
736 out requests just because the Python size looks too big.
738 """ # noqa: E501
739 if not max_length: # None or 0
740 return False # no maximum; not too long
741 # Fast, apt to overestimate size a bit (as above)
742 if self._cloudcfg.test_length_function_speed:
743 length = getsize(self._request_process, assume_none_denied=True)
745 if length <= max_length: # test the Python length
746 # Because the Python length is an overestimate of the JSON, if
747 # that is not more than the max, we can stop.
748 return False # not too long
750 # The Python size is too long. So now we recalculate using the slow but
751 # accurate way.
752 length = utf8len(to_json_str(self._request_process))
754 # Is it too long?
755 return length > max_length
757 def _add_processor_to_request(
758 self, procname: str, procversion: str
759 ) -> None:
760 """
761 Add a remote processor to the list of processors that we will request
762 results from.
764 Args:
765 procname: name of processor on the server
766 procversion: version of processor on the server
767 """
768 info = {NlprpKeys.NAME: procname}
769 if procversion:
770 info[NlprpKeys.VERSION] = procversion
771 self._request_process[NlprpKeys.ARGS][NlprpKeys.PROCESSORS].append(
772 info
773 )
775 def _add_all_processors_to_request(self) -> None:
776 """
777 Adds all requested processors.
778 """
779 bad = [] # type: List[str]
780 for name_version, cloudproc in self.requested_processors.items():
781 name = name_version[0]
782 version = name_version[1]
783 if cloudproc.available_remotely:
784 self._add_processor_to_request(name, version)
785 else:
786 bad.append(f"- {name!r} (version {version})")
787 if bad:
788 raise RuntimeError(
789 f"The following NLP processors are not available from the "
790 f"NLPRP server at {self._crinfo.cloudcfg.url!r}:\n"
791 + "\n".join(bad)
792 )
794 def add_text(self, text: str, metadata: Dict[str, Any]) -> None:
795 """
796 Adds text for analysis to the NLP request, with associated metadata.
798 Tests the size of the request if the text and metadata was added, then
799 adds it if it doesn't go over the size limit and there are word
800 characters in the text. Also checks if we've reached the maximum
801 records per request.
803 Args:
804 text: the text
805 metadata: the metadata (which we expect to get back later)
807 Raises:
808 - :exc:`RecordNotPrintable` if the record contains no printable
809 characters
810 - :exc:`RecordsPerRequestExceeded` if the request has exceeded the
811 maximum number of records per request
812 - :exc:`RequestTooLong` if the request has exceeded the maximum
813 length
814 """
815 if not does_text_contain_word_chars(text):
816 raise RecordNotPrintable
818 self.number_of_records += 1
819 if self.number_of_records > self._cloudcfg.max_records_per_request:
820 raise RecordsPerRequestExceeded
822 new_content = {NlprpKeys.METADATA: metadata, NlprpKeys.TEXT: text}
823 # Add all the identifying information.
824 args = self._request_process[NlprpKeys.ARGS]
825 content_key = NlprpKeys.CONTENT # local copy for fractional speedup
826 old_content = copy(args[content_key])
827 args[content_key].append(new_content)
828 max_length = self._cloudcfg.max_content_length
829 # Slow -- is there a way to get length without having to serialize?
830 # At least -- do it only once (forgiveness not permission, etc.).
831 if self._process_request_too_long(max_length):
832 # log.warning("too long!")
833 # Too long. Restore the previous state!
834 args[content_key] = old_content
835 raise RequestTooLong
837 def send_process_request(
838 self,
839 queue: bool,
840 cookies: CookieJar = None,
841 include_text_in_reply: bool = True,
842 ) -> None:
843 """
844 Sends a request to the server to process the text we have stored.
846 Args:
847 queue:
848 queue the request for back-end processing (rather than waiting
849 for an immediate reply)?
850 cookies:
851 optional :class:`http.cookiejar.CookieJar`
852 include_text_in_reply:
853 should the server include the source text in the reply?
854 """
855 # Don't send off an empty request
856 if not self._request_process[NlprpKeys.ARGS][NlprpKeys.CONTENT]:
857 log.warning("Request empty - not sending.")
858 return
860 # Create request
861 if cookies:
862 self.cookies = cookies
863 self._request_process[NlprpKeys.ARGS][NlprpKeys.QUEUE] = queue
864 self._request_process[NlprpKeys.ARGS][
865 NlprpKeys.INCLUDE_TEXT
866 ] = include_text_in_reply
867 request_json = to_json_str(self._request_process)
869 # Send request; get response
870 json_response = self._post_get_json(request_json)
872 status = json_response[NlprpKeys.STATUS]
873 if queue and status == HttpStatus.ACCEPTED:
874 self.queue_id = json_response[NlprpKeys.QUEUE_ID]
875 self._fetched = False
876 elif (not queue) and status == HttpStatus.OK:
877 self.nlp_data = json_response
878 self._fetched = True
879 else:
880 log.error(f"Got HTTP status code {status}.")
881 log.error(f"Response from server: {json_response}")
882 if self._cloudcfg.stop_at_failure:
883 raise HTTPError
884 else:
885 self.request_failed = True
886 return
888 # -------------------------------------------------------------------------
889 # Queue management for processing requests
890 # -------------------------------------------------------------------------
892 def set_queue_id(self, queue_id: str) -> None:
893 """
894 Sets the queue_id. To be used when you're not actually sending a
895 request this time.
896 """
897 self.queue_id = queue_id
899 def _try_fetch(
900 self, cookies: CookieJar = None
901 ) -> Optional[JsonObjectType]:
902 """
903 Tries to fetch the response from the server. Assumes queued mode.
904 Returns the JSON response.
905 """
906 # Create request
907 if cookies:
908 self.cookies = cookies
909 self._fetch_request[NlprpKeys.ARGS] = {
910 NlprpKeys.QUEUE_ID: self.queue_id
911 }
912 request_json = to_json_str(self._fetch_request)
914 # Send request; get response
915 json_response = self._post_get_json(request_json)
916 return json_response
918 def check_if_ready(self, cookies: CookieJar = None) -> bool:
919 """
920 Checks if the data is ready yet. Assumes queued mode (so
921 :meth:`set_queue_id` should have been called first). If the data is
922 ready, collect it and return ``True``, else return ``False``.
923 """
924 if self.queue_id is None:
925 log.warning("Tried to fetch from queue before sending request.")
926 return False
927 if self._fetched:
928 return False # todo: check with FS; is that the right response?
929 json_response = self._try_fetch(cookies)
930 if not json_response:
931 return False
932 status = json_response[NlprpKeys.STATUS]
933 pending_use_202 = (
934 Version(json_response[NlprpKeys.VERSION])
935 >= NlprpVersions.FETCH_Q_PENDING_RETURNS_202
936 )
937 if status == HttpStatus.OK:
938 self.nlp_data = json_response
939 self._fetched = True
940 return True
941 elif not pending_use_202 and status == HttpStatus.PROCESSING:
942 # Old server version returning 102 (Processing) (deprecated).
943 return False
944 elif pending_use_202 and status == HttpStatus.ACCEPTED:
945 # Newer server version returning 202 (Accepted).
946 return False
947 elif status == HttpStatus.NOT_FOUND:
948 # print(json_response)
949 log.error(
950 f"Got HTTP status code {HttpStatus.NOT_FOUND} - "
951 f"queue_id {self.queue_id} does not exist"
952 )
953 return False
954 else:
955 log.error(
956 f"Got HTTP status code {status} for queue_id {self.queue_id}."
957 )
958 return False
960 # -------------------------------------------------------------------------
961 # Results handling
962 # -------------------------------------------------------------------------
964 @staticmethod
965 def gen_nlp_values_generic_single_table(
966 processor: Cloud,
967 tablename: str,
968 rows: List[Dict[str, Any]],
969 metadata: Dict[str, Any],
970 column_renames: Dict[str, str] = None,
971 ) -> Generator[Tuple[str, Dict[str, Any], Cloud], None, None]:
972 """
973 Get result values from processed data, where the results object is a
974 list of rows (each row in dictionary format), all for a single table,
975 such as from a remote CRATE server.
977 Success should have been pre-verified.
979 Args:
980 processor:
981 The processor object.
982 tablename:
983 The table name to use.
984 rows:
985 List of NLPRP results for one processor. Each result represents
986 a row of a table and is in dictionary format.
987 metadata:
988 The metadata for a particular document - it would have been
989 sent with the document and the server would have sent it back.
990 column_renames:
991 Column renames to apply.
993 Yields ``(output_tablename, formatted_result, processor)``.
995 """
996 column_renames = column_renames or {}
997 for row in rows:
998 rename_keys_in_dict(row, column_renames)
999 row.update(metadata)
1000 yield tablename, row, processor
1002 @staticmethod
1003 def gen_nlp_values_gate(
1004 processor: Cloud,
1005 processor_results: List[Dict[str, Any]],
1006 metadata: Dict[str, Any],
1007 text: str = "",
1008 ) -> Generator[Tuple[str, Dict[str, Any], Cloud], None, None]:
1009 """
1010 Generates row values from processed GATE data.
1012 Success should have been pre-verified.
1014 Args:
1015 processor:
1016 The processor object:
1017 processor_results:
1018 A list of dictionaries (originally from JSON), each
1019 representing a row in a table, and each expected to have this
1020 format:
1022 .. code-block:: none
1024 {
1025 'set': set the results belong to (e.g. 'Medication'),
1026 'type': annotation type,
1027 'start': start index,
1028 'end': end index,
1029 'features': {
1030 a dictionary of features, e.g. having keys 'drug',
1031 'frequency', etc., with corresponding values
1032 }
1033 }
1035 metadata:
1036 The metadata for a particular document - it would have been
1037 sent with the document and the server would have sent it back.
1038 text:
1039 The source text itself (optional).
1041 Yields:
1043 tuples ``(output_tablename, formatted_result, processor)``
1045 Each instance of ``formatted_result`` has this format:
1047 .. code-block:: none
1049 {
1050 GateFieldNames.TYPE: annotation type,
1051 GateFieldNames.SET: set,
1052 GateFieldNames.STARTPOS: start index,
1053 GateFieldNames.ENDPOS: end index,
1054 GateFieldNames.CONTENT: text fragment,
1055 FEATURE1: VALUE1,
1056 FEATURE2: VALUE2,
1057 ...
1058 }
1059 """
1060 for row in processor_results:
1061 # Assuming each row says what annotation type it is (annotation
1062 # type is stored as lower case):
1063 annottype = row[GateResultKeys.TYPE].lower()
1064 features = row[GateResultKeys.FEATURES]
1065 start = row[GateResultKeys.START]
1066 end = row[GateResultKeys.END]
1067 formatted_result = {
1068 GateFieldNames.TYPE: annottype,
1069 GateFieldNames.SET: row[GateResultKeys.SET],
1070 GateFieldNames.STARTPOS: start,
1071 GateFieldNames.ENDPOS: end,
1072 GateFieldNames.CONTENT: text[start:end] if text else "",
1073 }
1074 formatted_result.update(features)
1075 c = processor.get_otconf_from_type(annottype)
1076 rename_keys_in_dict(formatted_result, c.renames)
1077 set_null_values_in_dict(formatted_result, c.null_literals)
1078 formatted_result.update(metadata)
1079 tablename = processor.get_tablename_from_type(annottype)
1080 yield tablename, formatted_result, processor
1082 def gen_nlp_values(
1083 self,
1084 ) -> Generator[Tuple[str, Dict[str, Any], Cloud], None, None]:
1085 """
1086 Process response data that we have already obtained from the server,
1087 generating individual NLP results.
1089 Yields:
1090 ``(tablename, result, processor)`` for each result.
1091 The ``tablename`` value is the actual destination database table.
1093 Raises:
1094 :exc:`KeyError` if an unexpected processor turned up in the results
1095 """
1096 # Method should only be called if we already have the nlp data
1097 assert self.nlp_data, (
1098 "Method 'get_nlp_values' must only be called "
1099 "after nlp_data is obtained"
1100 )
1101 docresultlist = extract_nlprp_top_level_results(self.nlp_data)
1102 for docresult in docresultlist:
1103 metadata, _, _, _ = parse_nlprp_docresult_metadata(docresult)
1104 text = docresult.get(NlprpKeys.TEXT)
1105 processor_data_list = extract_processor_data_list(docresult)
1106 for processor_data in processor_data_list:
1107 # Details of the server processor that has responded:
1108 (
1109 name,
1110 version,
1111 is_default_version,
1112 success,
1113 processor_results,
1114 ) = parse_per_processor_data(processor_data)
1116 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1117 # Check that the processor was one we asked for.
1118 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1119 try:
1120 # Retrieve the Python object corresponding to the server
1121 # processor that has responded:
1122 processor = self.requested_processors[(name, version)]
1123 except KeyError:
1124 # We did not request this processor name/version.
1125 failmsg = (
1126 f"Server returned processor {name} version {version}, "
1127 f"but this processor was not requested."
1128 ) # we may use this message
1129 if not is_default_version:
1130 # The server's processor is not the default version, so
1131 # we couldn't have obtained it by asking without a
1132 # version number.
1133 raise KeyError(failmsg)
1134 try:
1135 # Did we ask for this processor by name without caring
1136 # about its version number, and obtain it that way (as
1137 # default version)?
1138 processor = self.requested_processors.get((name, None))
1139 except KeyError:
1140 # No.
1141 raise KeyError(failmsg)
1143 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1144 # OK; we're happy with the processor. Was it happy?
1145 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1146 if not success:
1147 report_processor_errors(processor_data)
1148 return
1150 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1151 # All well. Process the results.
1152 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1153 # See nlprp.rst, <nlprp_format_of_per_processor_results>.
1154 if isinstance(processor_results, dict):
1155 # MULTI-TABLE FORMAT.
1156 # This is a dictionary mapping tables to row lists.
1157 if not processor.is_tabular():
1158 raise RuntimeError(
1159 f"Unsupported: processor {name!r} is returning "
1160 f"multi-table results but hasn't provided a "
1161 f"table schema"
1162 )
1163 tnames = processor.get_tabular_schema_tablenames()
1164 for remote_tablename, rows in processor_results.items():
1165 if remote_tablename not in tnames:
1166 raise ValueError(
1167 f"For processor {name!r}, data provided for "
1168 f"table {remote_tablename!r}, but this was "
1169 "not in the schema"
1170 )
1171 dest_tablename = processor.get_tablename_from_type(
1172 remote_tablename
1173 )
1174 yield from self.gen_nlp_values_generic_single_table(
1175 processor=processor,
1176 tablename=dest_tablename,
1177 rows=rows,
1178 metadata=metadata,
1179 column_renames=processor.get_otconf_from_type(
1180 remote_tablename
1181 ).renames,
1182 )
1183 elif isinstance(processor_results, list):
1184 # SINGLE TABLE FORMAT.
1185 # This is a list of rows, where each row should be a
1186 # dictionary mapping column names to values.
1187 if processor.format == NlpDefValues.FORMAT_GATE:
1188 # We have special knowledge of the "traditional" GATE
1189 # format. The sub-function will work out the table
1190 # name(s).
1191 yield from self.gen_nlp_values_gate(
1192 processor=processor,
1193 processor_results=processor_results,
1194 metadata=metadata,
1195 text=text,
1196 )
1197 else:
1198 # Potentially valid whether or not there is a
1199 # tabular_schema. The results object is a generic list
1200 # of column_name/value dictionaries.
1201 if processor.is_tabular():
1202 # Only valid here if there is a SINGLE table in
1203 # the tabular_schema.
1204 tnames = processor.get_tabular_schema_tablenames()
1205 if len(tnames) != 1:
1206 raise ValueError(
1207 f"Processor {name!r} returned results in "
1208 "list format, but this is only valid for "
1209 "a single table; its tables are "
1210 f"{tnames!r}"
1211 )
1212 remote_tablename = tnames[0]
1213 else:
1214 # We use the FIRST defined table name.
1215 remote_tablename = processor.get_first_tablename()
1216 dest_tablename = processor.get_tablename_from_type(
1217 remote_tablename
1218 )
1219 yield from self.gen_nlp_values_generic_single_table(
1220 processor=processor,
1221 tablename=dest_tablename,
1222 rows=processor_results,
1223 metadata=metadata,
1224 column_renames=processor.get_otconf_from_type(
1225 remote_tablename
1226 ).renames,
1227 )
1228 else:
1229 raise ValueError(
1230 f"For processor {name!r}, bad results format: "
1231 f"{processor_results!r}"
1232 )
1234 # @do_cprofile
1235 def process_all(self) -> None:
1236 """
1237 Puts the NLP data into the database. Very similar to
1238 :meth:`crate_anon.nlp_manager.base_nlp_parser.BaseNlpParser.process`,
1239 but deals with all relevant processors at once.
1240 """
1241 nlpname = self._nlpdef.name
1243 sessions = []
1245 for tablename, nlp_values, processor in self.gen_nlp_values():
1246 nlp_values[FN_NLPDEF] = nlpname
1247 session = processor.dest_session
1248 if session not in sessions:
1249 sessions.append(session)
1250 sqla_table = processor.get_table(tablename)
1251 column_names = [c.name for c in sqla_table.columns]
1252 # Convert string datetime back into datetime, using UTC
1253 for key in nlp_values:
1254 if key == FN_WHEN_FETCHED:
1255 nlp_values[key] = nlprp_datetime_to_datetime_utc_no_tzinfo(
1256 nlp_values[key]
1257 )
1258 final_values = {
1259 k: v for k, v in nlp_values.items() if k in column_names
1260 }
1261 insertquery = sqla_table.insert().values(final_values)
1262 try:
1263 with MultiTimerContext(timer, TIMING_INSERT):
1264 session.execute(insertquery)
1265 except DatabaseError as e:
1266 log.error(e)
1267 # ... but proceed.
1268 self._nlpdef.notify_transaction(
1269 session,
1270 n_rows=1,
1271 n_bytes=sys.getsizeof(final_values),
1272 force_commit=self._commit,
1273 )
1274 for session in sessions:
1275 session.commit()
1278# =============================================================================
1279# CloudRequestQueueManagement
1280# =============================================================================
1283class CloudRequestQueueManagement(CloudRequest):
1284 """
1285 Request to manage the queue in some way.
1286 """
1288 def show_queue(self) -> Optional[List[Dict[str, Any]]]:
1289 """
1290 Returns a list of the user's queued requests. Each list element is a
1291 dictionary as returned according to the :ref:`NLPRP <nlprp>`.
1292 """
1293 show_request = make_nlprp_request(command=NlprpCommands.SHOW_QUEUE)
1294 request_json = to_json_str(show_request)
1295 json_response = self._post_get_json(request_json, may_fail=False)
1297 status = json_response[NlprpKeys.STATUS]
1298 if status == HttpStatus.OK:
1299 try:
1300 queue = json_response[NlprpKeys.QUEUE]
1301 except KeyError:
1302 log.error(f"Response did not contain key {NlprpKeys.QUEUE!r}.")
1303 raise
1304 return queue
1305 else:
1306 # Is this the right error to raise?
1307 raise ValueError(f"Response status was: {status}")
1309 def delete_all_from_queue(self) -> None:
1310 """
1311 Delete ALL pending requests from the server's queue. Use with caution.
1312 """
1313 delete_request = make_nlprp_request(
1314 command=NlprpCommands.DELETE_FROM_QUEUE,
1315 command_args={NlprpKeys.DELETE_ALL: True},
1316 )
1317 request_json = to_json_str(delete_request)
1318 response = self._post(request_json, may_fail=False)
1319 # The GATE server-side doesn't send back JSON for this
1320 # todo: ... should it? We're sending to an NLPRP server, so it should?
1322 status = response.status_code
1323 if status == HttpStatus.NOT_FOUND:
1324 log.warning(
1325 "Queued request(s) not found. May have been cancelled "
1326 "already."
1327 )
1328 elif status != HttpStatus.OK and status != HttpStatus.NO_CONTENT:
1329 raise HTTPError(f"Response status was: {status}")
1331 def delete_from_queue(self, queue_ids: List[str]) -> None:
1332 """
1333 Delete pending requests from the server's queue for queue_ids
1334 specified.
1335 """
1336 delete_request = make_nlprp_request(
1337 command=NlprpCommands.DELETE_FROM_QUEUE,
1338 command_args={NlprpKeys.QUEUE_IDS: queue_ids},
1339 )
1340 request_json = to_json_str(delete_request)
1341 response = self._post(request_json, may_fail=False)
1342 # ... not (always) a JSON response?
1343 # todo: ... should it? We're sending to an NLPRP server, so it should?
1345 status = response.status_code
1346 if status == HttpStatus.NOT_FOUND:
1347 log.warning(
1348 "Queued request(s) not found. May have been cancelled "
1349 "already."
1350 )
1351 elif status != HttpStatus.OK and status != HttpStatus.NO_CONTENT:
1352 raise HTTPError(f"Response status was: {status}")
1353 self.cookies = response.cookies