Coverage for nlp_manager/cloud_request_sender.py: 99%

131 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1""" 

2crate_anon/nlp_manager/cloud_request_sender.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**CloudRequestSender class.** 

27 

28""" 

29 

30# ============================================================================= 

31# Imports 

32# ============================================================================= 

33 

34from enum import auto, Enum 

35import logging 

36from typing import ( 

37 Any, 

38 Dict, 

39 List, 

40 Optional, 

41 Tuple, 

42 Generator, 

43 TYPE_CHECKING, 

44) 

45 

46from crate_anon.nlp_manager.constants import ( 

47 DEFAULT_REPORT_EVERY_NLP, 

48) 

49from crate_anon.nlp_manager.input_field_config import ( 

50 InputFieldConfig, 

51 FN_SRCDB, 

52 FN_SRCTABLE, 

53 FN_SRCPKFIELD, 

54 FN_SRCPKVAL, 

55 FN_SRCPKSTR, 

56 FN_SRCFIELD, 

57) 

58from crate_anon.nlp_manager.models import FN_SRCHASH 

59from crate_anon.nlp_manager.cloud_request import ( 

60 CloudRequestProcess, 

61 RecordNotPrintable, 

62 RecordsPerRequestExceeded, 

63 RequestTooLong, 

64) 

65from crate_anon.nlp_manager.cloud_run_info import CloudRunInfo 

66 

67if TYPE_CHECKING: 

68 from http.cookiejar import CookieJar 

69 

70log = logging.getLogger(__name__) 

71 

72 

73# ============================================================================= 

74# CloudRequestSender 

75# ============================================================================= 

76 

77 

78class CloudRequestSender: 

79 """ 

80 Class to encapsulate a NLP request outbound to a cloud NLP server. 

81 """ 

82 

83 class State(Enum): 

84 """ 

85 Request state. 

86 """ 

87 

88 BUILDING_REQUEST = auto() 

89 SENDING_REQUEST = auto() 

90 FINISHED = auto() 

91 

92 def __init__( 

93 self, 

94 text_generator: Generator[Tuple[str, Dict[str, Any]], None, None], 

95 crinfo: CloudRunInfo, 

96 ifconfig: InputFieldConfig, 

97 report_every: int = DEFAULT_REPORT_EVERY_NLP, 

98 incremental: bool = False, 

99 queue: bool = True, 

100 ) -> None: 

101 """ 

102 Initialise class 

103 

104 Args: 

105 text_generator: 

106 Generator that generates text strings from the source 

107 database. See 

108 :meth:`crate_anon.nlp_manager.input_field_config.InputFieldConfig.gen_text`. 

109 crinfo: 

110 A :class:`crate_anon.nlp_manager.cloud_run_info.CloudRunInfo` 

111 object. 

112 ifconfig: 

113 An 

114 :class:`crate_anon.nlp_manager.input_field_config.InputFieldConfig` 

115 object. 

116 report_every: 

117 Report to the log every *n* requests. 

118 incremental: 

119 Process in incremental mode (ignoring source records that have 

120 not changed since last time)? 

121 queue: 

122 Queue the requests for back-end processing (rather than waiting 

123 for an immediate reply)? 

124 """ 

125 self._text_generator = text_generator 

126 self._crinfo = crinfo 

127 self._ifconfig = ifconfig 

128 self._report_every = report_every 

129 self._incremental = incremental 

130 self._queue = queue 

131 

132 self._global_recnum = -1 

133 self._requests = [] # type: List[CloudRequestProcess] 

134 self._cookies = None # type: Optional[CookieJar] 

135 self._request_count = 0 # number of requests sent 

136 self._text = None # type: Optional[str] 

137 self._other_values = None # type: Optional[Dict[str, Any]] 

138 self._request_is_empty = True 

139 self._need_new_record = True 

140 self._need_new_request = True 

141 self._num_recs_processed = 0 

142 self._state = self.State.BUILDING_REQUEST 

143 self._request = None # type: Optional[CloudRequestProcess] 

144 

145 def send_requests( 

146 self, global_recnum: int 

147 ) -> Tuple[List[CloudRequestProcess], bool, int]: 

148 """ 

149 Sends off a series of cloud requests and returns them as a list. 

150 ``self._queue`` determines whether these are queued requests or not. 

151 Also returns whether the generator for the text is empty. 

152 

153 Return tuple is: ``requests, some_records_processed, global_recnum``. 

154 """ 

155 self._global_recnum = global_recnum 

156 self._requests = [] 

157 self._cookies = None 

158 self._request_count = 1 

159 self._text = None 

160 self._other_values = None 

161 self._request_is_empty = True 

162 self._need_new_record = True 

163 self._need_new_request = True 

164 

165 # Check processors are available 

166 available_procs = self._crinfo.get_remote_processors() 

167 if not available_procs: 

168 return [], False, self._global_recnum 

169 

170 self._num_recs_processed = 0 

171 self._state = self.State.BUILDING_REQUEST 

172 

173 # If we've reached the limit of records before commit, return to 

174 # outer function in order to process and commit (or write to file if 

175 # it's a queued request) 

176 while self._state != self.State.FINISHED: 

177 if self._state == self.State.BUILDING_REQUEST: 

178 self._build_request() 

179 

180 if self._state == self.State.SENDING_REQUEST: 

181 self._send_request() 

182 

183 return ( 

184 self._requests, 

185 self._num_recs_processed > 0, 

186 self._global_recnum, 

187 ) 

188 

189 def _build_request(self) -> None: 

190 """ 

191 Adds another record to the outbound request, until the request is 

192 fully built. Updates our state to reflect what needs to happen next. 

193 """ 

194 if self._need_new_record: 

195 try: 

196 self._get_next_record() 

197 except StopIteration: 

198 self._update_state_for_no_more_records() 

199 return 

200 

201 hasher = self._crinfo.nlpdef.hash 

202 srchash = hasher(self._text) 

203 

204 if self._incremental and self._record_already_processed(srchash): 

205 return 

206 

207 self._num_recs_processed += 1 

208 self._other_values[FN_SRCHASH] = srchash 

209 

210 if self._need_new_request: 

211 self._request = self._get_new_cloud_request() 

212 self._request_is_empty = True 

213 self._need_new_request = False 

214 

215 self._need_new_record = True 

216 

217 # Add the text to the cloud request with the appropriate metadata 

218 try: 

219 self._request.add_text(self._text, self._other_values) 

220 

221 # added OK, request now has some text 

222 self._request_is_empty = False 

223 

224 except RecordNotPrintable: 

225 # Text contained no printable characters. Skip it. 

226 pass 

227 except (RecordsPerRequestExceeded, RequestTooLong) as e: 

228 if isinstance(e, RequestTooLong) and self._request_is_empty: 

229 # Get some new text next time 

230 log.warning("Skipping text that's too long to send") 

231 else: 

232 # Try same text again with a fresh request 

233 self._need_new_record = False 

234 self._state = self.State.SENDING_REQUEST 

235 

236 if self._record_limit_reached(): 

237 self._state = self.State.SENDING_REQUEST 

238 

239 def _get_new_cloud_request(self) -> CloudRequestProcess: 

240 """ 

241 Creates and returns a new 

242 :class:`crate_anon.nlp_manager.cloud_request.CloudRequestProcess` 

243 object. 

244 """ 

245 return CloudRequestProcess(self._crinfo) 

246 

247 def _update_state_for_no_more_records(self) -> None: 

248 """ 

249 No more input records are available. This means either (a) we've sent 

250 all our requests and have finished, or (b) we're building our last 

251 request and we need to send it. Set the state accordingly. 

252 """ 

253 if self._request_is_empty or self._need_new_request: 

254 # Nothing more to send 

255 self._state = self.State.FINISHED 

256 return 

257 

258 # Send last request 

259 self._state = self.State.SENDING_REQUEST 

260 

261 def _record_already_processed(self, srchash: str) -> bool: 

262 """ 

263 Has this source record (identified by its PK and its hash) already been 

264 processed? (If so, then in incremental mode, we can skip it.) 

265 """ 

266 pkval = self._other_values[FN_SRCPKVAL] 

267 pkstr = self._other_values[FN_SRCPKSTR] 

268 progrec = self._ifconfig.get_progress_record(pkval, pkstr) 

269 if progrec is not None: 

270 if progrec.srchash == srchash: 

271 log.debug("Record previously processed; skipping") 

272 return True 

273 

274 log.debug("Record has changed") 

275 else: 

276 log.debug("Record is new") 

277 

278 return False 

279 

280 def _record_limit_reached(self) -> bool: 

281 """ 

282 Have we processed as many records as we're allowed before we should 

283 COMMIT to the database? 

284 """ 

285 limit_before_commit = self._crinfo.cloudcfg.limit_before_commit 

286 return self._num_recs_processed >= limit_before_commit 

287 

288 def _get_next_record(self) -> None: 

289 """ 

290 Reads the next text record and metadata into ``self._text`` and 

291 ``self._other_values``. 

292 

293 Raises: 

294 :exc:`StopIteration` if there are no more records 

295 """ 

296 self._text, self._other_values = next(self._text_generator) 

297 self._global_recnum += 1 

298 

299 pkval = self._other_values[FN_SRCPKVAL] 

300 pkstr = self._other_values[FN_SRCPKSTR] 

301 # 'ifconfig.get_progress_record' expects pkstr to be None if it's 

302 # empty 

303 if not pkstr: 

304 pkstr = None 

305 if ( 

306 self._report_every 

307 and self._global_recnum % self._report_every == 0 

308 ): 

309 # total number of records in table 

310 totalcount = self._ifconfig.get_count() 

311 log.info( 

312 "Processing {db}.{t}.{c}, PK: {pkf}={pkv} " 

313 "(record {g_recnum}/{totalcount})".format( 

314 db=self._other_values[FN_SRCDB], 

315 t=self._other_values[FN_SRCTABLE], 

316 c=self._other_values[FN_SRCFIELD], 

317 pkf=self._other_values[FN_SRCPKFIELD], 

318 pkv=pkstr if pkstr else pkval, 

319 g_recnum=self._global_recnum, 

320 totalcount=totalcount, 

321 ) 

322 ) 

323 

324 def _send_request(self) -> None: 

325 """ 

326 Send a pending request to the remote NLP server. 

327 Update the state afterwards. 

328 """ 

329 self._request.send_process_request( 

330 queue=self._queue, 

331 cookies=self._cookies, 

332 include_text_in_reply=self._crinfo.cloudcfg.has_gate_processors, 

333 ) 

334 # If there's a connection error, we only get this far if we 

335 # didn't choose to stop at failure 

336 if self._request.request_failed: 

337 log.warning("Continuing after failed request.") 

338 else: 

339 if self._request.cookies: 

340 self._cookies = self._request.cookies 

341 log.info( 

342 f"Sent request to be processed: #{self._request_count} " 

343 f"of this block" 

344 ) 

345 self._request_count += 1 

346 self._requests.append(self._request) 

347 

348 if self._record_limit_reached(): 

349 self._state = self.State.FINISHED 

350 return 

351 

352 self._state = self.State.BUILDING_REQUEST 

353 self._need_new_request = True