Coverage for nlp_manager/cloud_request_sender.py: 99%
131 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1"""
2crate_anon/nlp_manager/cloud_request_sender.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**CloudRequestSender class.**
28"""
30# =============================================================================
31# Imports
32# =============================================================================
34from enum import auto, Enum
35import logging
36from typing import (
37 Any,
38 Dict,
39 List,
40 Optional,
41 Tuple,
42 Generator,
43 TYPE_CHECKING,
44)
46from crate_anon.nlp_manager.constants import (
47 DEFAULT_REPORT_EVERY_NLP,
48)
49from crate_anon.nlp_manager.input_field_config import (
50 InputFieldConfig,
51 FN_SRCDB,
52 FN_SRCTABLE,
53 FN_SRCPKFIELD,
54 FN_SRCPKVAL,
55 FN_SRCPKSTR,
56 FN_SRCFIELD,
57)
58from crate_anon.nlp_manager.models import FN_SRCHASH
59from crate_anon.nlp_manager.cloud_request import (
60 CloudRequestProcess,
61 RecordNotPrintable,
62 RecordsPerRequestExceeded,
63 RequestTooLong,
64)
65from crate_anon.nlp_manager.cloud_run_info import CloudRunInfo
67if TYPE_CHECKING:
68 from http.cookiejar import CookieJar
70log = logging.getLogger(__name__)
73# =============================================================================
74# CloudRequestSender
75# =============================================================================
78class CloudRequestSender:
79 """
80 Class to encapsulate a NLP request outbound to a cloud NLP server.
81 """
83 class State(Enum):
84 """
85 Request state.
86 """
88 BUILDING_REQUEST = auto()
89 SENDING_REQUEST = auto()
90 FINISHED = auto()
92 def __init__(
93 self,
94 text_generator: Generator[Tuple[str, Dict[str, Any]], None, None],
95 crinfo: CloudRunInfo,
96 ifconfig: InputFieldConfig,
97 report_every: int = DEFAULT_REPORT_EVERY_NLP,
98 incremental: bool = False,
99 queue: bool = True,
100 ) -> None:
101 """
102 Initialise class
104 Args:
105 text_generator:
106 Generator that generates text strings from the source
107 database. See
108 :meth:`crate_anon.nlp_manager.input_field_config.InputFieldConfig.gen_text`.
109 crinfo:
110 A :class:`crate_anon.nlp_manager.cloud_run_info.CloudRunInfo`
111 object.
112 ifconfig:
113 An
114 :class:`crate_anon.nlp_manager.input_field_config.InputFieldConfig`
115 object.
116 report_every:
117 Report to the log every *n* requests.
118 incremental:
119 Process in incremental mode (ignoring source records that have
120 not changed since last time)?
121 queue:
122 Queue the requests for back-end processing (rather than waiting
123 for an immediate reply)?
124 """
125 self._text_generator = text_generator
126 self._crinfo = crinfo
127 self._ifconfig = ifconfig
128 self._report_every = report_every
129 self._incremental = incremental
130 self._queue = queue
132 self._global_recnum = -1
133 self._requests = [] # type: List[CloudRequestProcess]
134 self._cookies = None # type: Optional[CookieJar]
135 self._request_count = 0 # number of requests sent
136 self._text = None # type: Optional[str]
137 self._other_values = None # type: Optional[Dict[str, Any]]
138 self._request_is_empty = True
139 self._need_new_record = True
140 self._need_new_request = True
141 self._num_recs_processed = 0
142 self._state = self.State.BUILDING_REQUEST
143 self._request = None # type: Optional[CloudRequestProcess]
145 def send_requests(
146 self, global_recnum: int
147 ) -> Tuple[List[CloudRequestProcess], bool, int]:
148 """
149 Sends off a series of cloud requests and returns them as a list.
150 ``self._queue`` determines whether these are queued requests or not.
151 Also returns whether the generator for the text is empty.
153 Return tuple is: ``requests, some_records_processed, global_recnum``.
154 """
155 self._global_recnum = global_recnum
156 self._requests = []
157 self._cookies = None
158 self._request_count = 1
159 self._text = None
160 self._other_values = None
161 self._request_is_empty = True
162 self._need_new_record = True
163 self._need_new_request = True
165 # Check processors are available
166 available_procs = self._crinfo.get_remote_processors()
167 if not available_procs:
168 return [], False, self._global_recnum
170 self._num_recs_processed = 0
171 self._state = self.State.BUILDING_REQUEST
173 # If we've reached the limit of records before commit, return to
174 # outer function in order to process and commit (or write to file if
175 # it's a queued request)
176 while self._state != self.State.FINISHED:
177 if self._state == self.State.BUILDING_REQUEST:
178 self._build_request()
180 if self._state == self.State.SENDING_REQUEST:
181 self._send_request()
183 return (
184 self._requests,
185 self._num_recs_processed > 0,
186 self._global_recnum,
187 )
189 def _build_request(self) -> None:
190 """
191 Adds another record to the outbound request, until the request is
192 fully built. Updates our state to reflect what needs to happen next.
193 """
194 if self._need_new_record:
195 try:
196 self._get_next_record()
197 except StopIteration:
198 self._update_state_for_no_more_records()
199 return
201 hasher = self._crinfo.nlpdef.hash
202 srchash = hasher(self._text)
204 if self._incremental and self._record_already_processed(srchash):
205 return
207 self._num_recs_processed += 1
208 self._other_values[FN_SRCHASH] = srchash
210 if self._need_new_request:
211 self._request = self._get_new_cloud_request()
212 self._request_is_empty = True
213 self._need_new_request = False
215 self._need_new_record = True
217 # Add the text to the cloud request with the appropriate metadata
218 try:
219 self._request.add_text(self._text, self._other_values)
221 # added OK, request now has some text
222 self._request_is_empty = False
224 except RecordNotPrintable:
225 # Text contained no printable characters. Skip it.
226 pass
227 except (RecordsPerRequestExceeded, RequestTooLong) as e:
228 if isinstance(e, RequestTooLong) and self._request_is_empty:
229 # Get some new text next time
230 log.warning("Skipping text that's too long to send")
231 else:
232 # Try same text again with a fresh request
233 self._need_new_record = False
234 self._state = self.State.SENDING_REQUEST
236 if self._record_limit_reached():
237 self._state = self.State.SENDING_REQUEST
239 def _get_new_cloud_request(self) -> CloudRequestProcess:
240 """
241 Creates and returns a new
242 :class:`crate_anon.nlp_manager.cloud_request.CloudRequestProcess`
243 object.
244 """
245 return CloudRequestProcess(self._crinfo)
247 def _update_state_for_no_more_records(self) -> None:
248 """
249 No more input records are available. This means either (a) we've sent
250 all our requests and have finished, or (b) we're building our last
251 request and we need to send it. Set the state accordingly.
252 """
253 if self._request_is_empty or self._need_new_request:
254 # Nothing more to send
255 self._state = self.State.FINISHED
256 return
258 # Send last request
259 self._state = self.State.SENDING_REQUEST
261 def _record_already_processed(self, srchash: str) -> bool:
262 """
263 Has this source record (identified by its PK and its hash) already been
264 processed? (If so, then in incremental mode, we can skip it.)
265 """
266 pkval = self._other_values[FN_SRCPKVAL]
267 pkstr = self._other_values[FN_SRCPKSTR]
268 progrec = self._ifconfig.get_progress_record(pkval, pkstr)
269 if progrec is not None:
270 if progrec.srchash == srchash:
271 log.debug("Record previously processed; skipping")
272 return True
274 log.debug("Record has changed")
275 else:
276 log.debug("Record is new")
278 return False
280 def _record_limit_reached(self) -> bool:
281 """
282 Have we processed as many records as we're allowed before we should
283 COMMIT to the database?
284 """
285 limit_before_commit = self._crinfo.cloudcfg.limit_before_commit
286 return self._num_recs_processed >= limit_before_commit
288 def _get_next_record(self) -> None:
289 """
290 Reads the next text record and metadata into ``self._text`` and
291 ``self._other_values``.
293 Raises:
294 :exc:`StopIteration` if there are no more records
295 """
296 self._text, self._other_values = next(self._text_generator)
297 self._global_recnum += 1
299 pkval = self._other_values[FN_SRCPKVAL]
300 pkstr = self._other_values[FN_SRCPKSTR]
301 # 'ifconfig.get_progress_record' expects pkstr to be None if it's
302 # empty
303 if not pkstr:
304 pkstr = None
305 if (
306 self._report_every
307 and self._global_recnum % self._report_every == 0
308 ):
309 # total number of records in table
310 totalcount = self._ifconfig.get_count()
311 log.info(
312 "Processing {db}.{t}.{c}, PK: {pkf}={pkv} "
313 "(record {g_recnum}/{totalcount})".format(
314 db=self._other_values[FN_SRCDB],
315 t=self._other_values[FN_SRCTABLE],
316 c=self._other_values[FN_SRCFIELD],
317 pkf=self._other_values[FN_SRCPKFIELD],
318 pkv=pkstr if pkstr else pkval,
319 g_recnum=self._global_recnum,
320 totalcount=totalcount,
321 )
322 )
324 def _send_request(self) -> None:
325 """
326 Send a pending request to the remote NLP server.
327 Update the state afterwards.
328 """
329 self._request.send_process_request(
330 queue=self._queue,
331 cookies=self._cookies,
332 include_text_in_reply=self._crinfo.cloudcfg.has_gate_processors,
333 )
334 # If there's a connection error, we only get this far if we
335 # didn't choose to stop at failure
336 if self._request.request_failed:
337 log.warning("Continuing after failed request.")
338 else:
339 if self._request.cookies:
340 self._cookies = self._request.cookies
341 log.info(
342 f"Sent request to be processed: #{self._request_count} "
343 f"of this block"
344 )
345 self._request_count += 1
346 self._requests.append(self._request)
348 if self._record_limit_reached():
349 self._state = self.State.FINISHED
350 return
352 self._state = self.State.BUILDING_REQUEST
353 self._need_new_request = True