Coverage for nlp_manager/tests/cloud_request_process_tests.py: 100%

254 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1""" 

2crate_anon/nlp_manager/tests/cloud_request_process_tests.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26Unit tests. 

27 

28Reminder: to enable logging, use e.g. pytest -k [testname] --log-cli-level=INFO 

29 

30""" 

31 

32import json 

33import logging 

34import os 

35from pathlib import Path 

36import sys 

37from unittest import mock, TestCase 

38from tempfile import TemporaryDirectory 

39from typing import Any, Dict 

40 

41from cardinal_pythonlib.httpconst import HttpStatus 

42from sqlalchemy.engine import create_engine 

43from sqlalchemy.exc import OperationalError 

44from sqlalchemy.schema import Column 

45from sqlalchemy.sql.expression import text 

46 

47from crate_anon.nlp_manager.all_processors import ( 

48 register_all_crate_python_processors_with_serverprocessor, 

49) 

50from crate_anon.nlp_manager.cloud_parser import Cloud 

51from crate_anon.nlp_manager.cloud_request import ( 

52 CloudRequest, 

53 CloudRequestListProcessors, 

54 CloudRequestProcess, 

55) 

56from crate_anon.nlp_manager.cloud_run_info import CloudRunInfo 

57from crate_anon.nlp_manager.constants import ( 

58 CloudNlpConfigKeys, 

59 DatabaseConfigKeys, 

60 InputFieldConfigKeys, 

61 NLP_CONFIG_ENV_VAR, 

62 NlpConfigPrefixes, 

63 NlpDefConfigKeys, 

64 NlpDefValues, 

65 NlpOutputConfigKeys, 

66 ProcessorConfigKeys, 

67) 

68from crate_anon.nlp_manager.nlp_definition import NlpDefinition 

69from crate_anon.nlp_manager.nlp_manager import drop_remake, process_cloud_now 

70from crate_anon.nlp_webserver.server_processor import ServerProcessor 

71import crate_anon.nlp_webserver.tasks 

72from crate_anon.nlp_webserver.views import NlpWebViews 

73from crate_anon.nlprp.constants import NlprpKeys, NlprpValues 

74 

75log = logging.getLogger(__name__) 

76 

77 

78# ============================================================================= 

79# CloudRequestProcessTests 

80# ============================================================================= 

81 

82 

83class CloudRequestProcessTests(TestCase): 

84 def setUp(self) -> None: 

85 self.mock_execute_method = mock.Mock() 

86 self.mock_session = mock.Mock(execute=self.mock_execute_method) 

87 self.mock_db = mock.Mock(session=self.mock_session) 

88 

89 # can't set name attribute in constructor here as it has special 

90 # meaning 

91 mock_column = mock.Mock() 

92 mock_column.name = "fruit" # so set it here 

93 

94 self.mock_values_method = mock.Mock() 

95 mock_insert_object = mock.Mock(values=self.mock_values_method) 

96 mock_insert_method = mock.Mock(return_value=mock_insert_object) 

97 mock_sqla_table = mock.Mock( 

98 columns=[mock_column], insert=mock_insert_method 

99 ) 

100 mock_get_table_method = mock.Mock(return_value=mock_sqla_table) 

101 self.mock_processor = mock.Mock( 

102 get_table=mock_get_table_method, dest_session=self.mock_session 

103 ) 

104 

105 self.mock_notify_transaction_method = mock.Mock() 

106 self.mock_nlpdef = mock.Mock( 

107 notify_transaction=self.mock_notify_transaction_method 

108 ) 

109 self.mock_nlpdef.name = "fruitdef" 

110 self.process = CloudRequestProcess(nlpdef=self.mock_nlpdef) 

111 

112 def test_process_all_inserts_values(self) -> None: 

113 nlp_values = [ 

114 ("output", {"fruit": "apple"}, self.mock_processor), 

115 ("output", {"fruit": "banana"}, self.mock_processor), 

116 ("output", {"fruit": "fig"}, self.mock_processor), 

117 ] 

118 

119 mock_gen_nlp_values_method = mock.Mock(return_value=iter(nlp_values)) 

120 

121 with mock.patch.multiple( 

122 self.process, gen_nlp_values=mock_gen_nlp_values_method 

123 ): 

124 self.process.process_all() 

125 

126 self.mock_values_method.assert_any_call({"fruit": "apple"}) 

127 self.mock_values_method.assert_any_call({"fruit": "banana"}) 

128 self.mock_values_method.assert_any_call({"fruit": "fig"}) 

129 self.assertEqual(self.mock_values_method.call_count, 3) 

130 self.assertEqual(self.mock_execute_method.call_count, 3) 

131 

132 self.mock_notify_transaction_method.assert_any_call( 

133 self.mock_session, 

134 n_rows=1, 

135 n_bytes=sys.getsizeof({"fruit": "apple"}), 

136 force_commit=mock.ANY, 

137 ) 

138 self.mock_notify_transaction_method.assert_any_call( 

139 self.mock_session, 

140 n_rows=1, 

141 n_bytes=sys.getsizeof({"fruit": "banana"}), 

142 force_commit=mock.ANY, 

143 ) 

144 self.mock_notify_transaction_method.assert_any_call( 

145 self.mock_session, 

146 n_rows=1, 

147 n_bytes=sys.getsizeof({"fruit": "fig"}), 

148 force_commit=mock.ANY, 

149 ) 

150 self.assertEqual(self.mock_notify_transaction_method.call_count, 3) 

151 

152 def test_process_all_handles_failed_insert(self) -> None: 

153 nlp_values = [ 

154 ("output", {"fruit": "apple"}, self.mock_processor), 

155 ] 

156 

157 self.mock_execute_method.side_effect = OperationalError( 

158 "Insert failed", None, None, None 

159 ) 

160 

161 mock_gen_nlp_values_method = mock.Mock(return_value=iter(nlp_values)) 

162 with self.assertLogs(level=logging.ERROR) as logging_cm: 

163 with mock.patch.multiple( 

164 self.process, gen_nlp_values=mock_gen_nlp_values_method 

165 ): 

166 self.process.process_all() 

167 

168 self.mock_notify_transaction_method.assert_any_call( 

169 self.mock_session, 

170 n_rows=1, 

171 n_bytes=sys.getsizeof({"fruit": "apple"}), 

172 force_commit=mock.ANY, 

173 ) 

174 logger_name = "crate_anon.nlp_manager.cloud_request" 

175 

176 self.assertIn(f"ERROR:{logger_name}", logging_cm.output[0]) 

177 self.assertIn("Insert failed", logging_cm.output[0]) 

178 

179 def test_not_ready_if_queue_id_is_none(self) -> None: 

180 self.process.queue_id = None 

181 with self.assertLogs(level=logging.WARNING) as logging_cm: 

182 ready = self.process.check_if_ready() 

183 self.assertFalse(ready) 

184 self.assertIn( 

185 "Tried to fetch from queue before sending request.", 

186 logging_cm.output[0], 

187 ) 

188 

189 def test_not_ready_if_fetched(self) -> None: 

190 self.process.queue_id = "queue_0001" 

191 self.process._fetched = True 

192 

193 ready = self.process.check_if_ready() 

194 self.assertFalse(ready) 

195 

196 def test_not_ready_if_no_response(self) -> None: 

197 self.process.queue_id = "queue_0001" 

198 with mock.patch.object(self.process, "_try_fetch", return_value=None): 

199 ready = self.process.check_if_ready() 

200 self.assertFalse(ready) 

201 

202 def test_ready_for_status_ok(self) -> None: 

203 self.process.queue_id = "queue_0001" 

204 

205 response = { 

206 NlprpKeys.STATUS: HttpStatus.OK, 

207 NlprpKeys.VERSION: "0.3.0", 

208 } 

209 

210 with mock.patch.object( 

211 self.process, "_try_fetch", return_value=response 

212 ): 

213 ready = self.process.check_if_ready() 

214 self.assertTrue(ready) 

215 

216 def test_not_ready_when_old_server_status_processing(self) -> None: 

217 self.process.queue_id = "queue_0001" 

218 

219 response = { 

220 NlprpKeys.STATUS: HttpStatus.PROCESSING, 

221 NlprpKeys.VERSION: "0.2.0", 

222 } 

223 

224 with mock.patch.object( 

225 self.process, "_try_fetch", return_value=response 

226 ): 

227 ready = self.process.check_if_ready() 

228 self.assertFalse(ready) 

229 

230 def test_not_ready_when_new_server_status_accepted(self) -> None: 

231 self.process.queue_id = "queue_0001" 

232 

233 response = { 

234 NlprpKeys.STATUS: HttpStatus.ACCEPTED, 

235 NlprpKeys.VERSION: "0.3.0", 

236 } 

237 

238 with mock.patch.object( 

239 self.process, "_try_fetch", return_value=response 

240 ): 

241 ready = self.process.check_if_ready() 

242 self.assertFalse(ready) 

243 

244 def test_not_ready_when_server_status_not_found(self) -> None: 

245 self.process.queue_id = "queue_0001" 

246 

247 response = { 

248 NlprpKeys.STATUS: HttpStatus.NOT_FOUND, 

249 NlprpKeys.VERSION: "0.3.0", 

250 } 

251 

252 with mock.patch.object( 

253 self.process, "_try_fetch", return_value=response 

254 ): 

255 with self.assertLogs(level=logging.WARNING) as logging_cm: 

256 ready = self.process.check_if_ready() 

257 self.assertFalse(ready) 

258 self.assertIn("Got HTTP status code 404", logging_cm.output[0]) 

259 

260 def test_not_ready_when_server_status_anything_else(self) -> None: 

261 self.process.queue_id = "queue_0001" 

262 

263 response = { 

264 NlprpKeys.STATUS: HttpStatus.FORBIDDEN, 

265 NlprpKeys.VERSION: "0.3.0", 

266 } 

267 

268 with mock.patch.object( 

269 self.process, "_try_fetch", return_value=response 

270 ): 

271 with self.assertLogs(level=logging.WARNING) as logging_cm: 

272 ready = self.process.check_if_ready() 

273 self.assertFalse(ready) 

274 self.assertIn("Got HTTP status code 403", logging_cm.output[0]) 

275 

276 

277# ============================================================================= 

278# CloudRequestListProcessorsTests 

279# ============================================================================= 

280 

281# A real one that wasn't working, 2024-12-16, with keys parameterized and 

282# boolean values Pythonized. 

283TEST_REMOTE_TABLE_SMOKING = "Smoking:Smoking" 

284TEST_SMOKING_PROC_NAME = "smoking" 

285TEST_SMOKING_PROC_VERSION = "0.1" 

286TEST_SMOKING_PROCINFO = { 

287 NlprpKeys.DESCRIPTION: "A description", 

288 NlprpKeys.IS_DEFAULT_VERSION: True, 

289 NlprpKeys.NAME: TEST_SMOKING_PROC_NAME, 

290 NlprpKeys.SCHEMA_TYPE: NlprpValues.TABULAR, 

291 NlprpKeys.SQL_DIALECT: "mssql", 

292 NlprpKeys.TABULAR_SCHEMA: { 

293 TEST_REMOTE_TABLE_SMOKING: [ 

294 { 

295 NlprpKeys.COLUMN_NAME: "start_", 

296 NlprpKeys.COLUMN_TYPE: "BIGINT", 

297 NlprpKeys.DATA_TYPE: "BIGINT", 

298 NlprpKeys.IS_NULLABLE: False, 

299 }, 

300 { 

301 NlprpKeys.COLUMN_NAME: "end_", 

302 NlprpKeys.COLUMN_TYPE: "BIGINT", 

303 NlprpKeys.DATA_TYPE: "BIGINT", 

304 NlprpKeys.IS_NULLABLE: False, 

305 }, 

306 { 

307 NlprpKeys.COLUMN_NAME: "who", 

308 NlprpKeys.COLUMN_TYPE: "NVARCHAR(255)", 

309 NlprpKeys.DATA_TYPE: "NVARCHAR", 

310 NlprpKeys.IS_NULLABLE: True, 

311 }, 

312 { 

313 NlprpKeys.COLUMN_NAME: "rule", 

314 NlprpKeys.COLUMN_TYPE: "VARCHAR(50)", 

315 NlprpKeys.DATA_TYPE: "VARCHAR", 

316 NlprpKeys.IS_NULLABLE: True, 

317 }, 

318 { 

319 NlprpKeys.COLUMN_NAME: "status", 

320 NlprpKeys.COLUMN_TYPE: "VARCHAR(10)", 

321 NlprpKeys.DATA_TYPE: "VARCHAR", 

322 NlprpKeys.IS_NULLABLE: True, 

323 }, 

324 ] 

325 }, 

326 NlprpKeys.TITLE: "Smoking Status Annotator", 

327 NlprpKeys.VERSION: TEST_SMOKING_PROC_VERSION, 

328} 

329 

330 

331class CloudRequestListProcessorsTests(TestCase): 

332 def setUp(self) -> None: 

333 self.mock_nlpdef = mock.Mock(name="mock_nlpdef") 

334 self.mock_nlpdef.name = "testlistprocdef" 

335 self.process = CloudRequestListProcessors(nlpdef=self.mock_nlpdef) 

336 self.test_version = "0.3.0" 

337 

338 def test_processors_key_missing(self) -> None: 

339 response = { 

340 NlprpKeys.STATUS: HttpStatus.ACCEPTED, 

341 NlprpKeys.VERSION: self.test_version, 

342 # Missing: NKeys.PROCESSORS 

343 } 

344 with mock.patch.object( 

345 self.process, "_post_get_json", return_value=response 

346 ): 

347 with self.assertRaises(KeyError): 

348 self.process.get_remote_processors() 

349 

350 def test_processors_not_list(self) -> None: 

351 response = { 

352 NlprpKeys.STATUS: HttpStatus.ACCEPTED, 

353 NlprpKeys.VERSION: self.test_version, 

354 NlprpKeys.PROCESSORS: "XXX", # not a list 

355 } 

356 with mock.patch.object( 

357 self.process, "_post_get_json", return_value=response 

358 ): 

359 with self.assertRaises(ValueError): 

360 self.process.get_remote_processors() 

361 

362 def test_procinfo_not_dict(self) -> None: 

363 procinfo = "xxx" # not a dict 

364 response = { 

365 NlprpKeys.STATUS: HttpStatus.ACCEPTED, 

366 NlprpKeys.VERSION: self.test_version, 

367 NlprpKeys.PROCESSORS: [procinfo], 

368 } 

369 with mock.patch.object( 

370 self.process, "_post_get_json", return_value=response 

371 ): 

372 with self.assertRaises(ValueError): 

373 self.process.get_remote_processors() 

374 

375 def test_procinfo_missing_keys(self) -> None: 

376 mandatory_keys = ( 

377 NlprpKeys.NAME, 

378 NlprpKeys.TITLE, 

379 NlprpKeys.VERSION, 

380 NlprpKeys.DESCRIPTION, 

381 ) 

382 base_procinfo = {k: "x" for k in mandatory_keys} 

383 for key in mandatory_keys: 

384 procinfo = base_procinfo.copy() 

385 del procinfo[key] 

386 response = { 

387 NlprpKeys.STATUS: HttpStatus.ACCEPTED, 

388 NlprpKeys.VERSION: self.test_version, 

389 NlprpKeys.PROCESSORS: [procinfo], 

390 } 

391 with mock.patch.object( 

392 self.process, "_post_get_json", return_value=response 

393 ): 

394 with self.assertRaises(KeyError): 

395 self.process.get_remote_processors() 

396 

397 def test_procinfo_smoking(self) -> None: 

398 response = { 

399 NlprpKeys.STATUS: HttpStatus.ACCEPTED, 

400 NlprpKeys.VERSION: self.test_version, 

401 NlprpKeys.PROCESSORS: [TEST_SMOKING_PROCINFO], 

402 } 

403 with mock.patch.object( 

404 self.process, "_post_get_json", return_value=response 

405 ): 

406 self.process.get_remote_processors() 

407 # Should be happy. 

408 # Clean up: 

409 ServerProcessor.debug_remove_processor( 

410 name=TEST_SMOKING_PROC_NAME, version=TEST_SMOKING_PROC_VERSION 

411 ) 

412 

413 

414# ============================================================================= 

415# CloudRequestDataTests 

416# ============================================================================= 

417 

418 

419class CloudRequestDataTests(TestCase): 

420 def setUp(self) -> None: 

421 # On-disk database 

422 self.tempdir = TemporaryDirectory() # will be deleted automatically 

423 self.dbfilepath = Path(self.tempdir.name, "test.sqlite") 

424 log.info(f"Using temporary database: {self.dbfilepath}") 

425 self.dburl = f"sqlite:///{self.dbfilepath.absolute()}" 

426 self.txttable = "notes" 

427 self.pkcol = "pk" 

428 self.txtcol = "note" 

429 self.echo = False 

430 self._mk_test_data() 

431 

432 # Dummy database 

433 self.dummy_engine = create_engine("sqlite://", future=True) 

434 

435 # Config file 

436 self.nlpdefname = "mynlp" 

437 self.dbsectionname = "mydb" 

438 self.cloudconfigname = "mycloud" 

439 self.inputname = "myinput" 

440 self.cloudclassname = "Cloud" 

441 self.cloudproc_crp = "proc_crp" 

442 self.cloudproc_alcohol = "proc_alcohol" 

443 self.output_crp = "crp_output" 

444 self.output_alcohol = "alcohol_output" 

445 self.configfilepath = Path(self.tempdir.name, "crate_test_nlp.ini") 

446 with open(self.configfilepath, "wt") as f: 

447 configtext = self._mk_nlp_config() 

448 log.debug(configtext) 

449 f.write(configtext) 

450 

451 # Server side 

452 register_all_crate_python_processors_with_serverprocessor() 

453 self.mock_pyramid_request = mock.Mock(name="mock_pyramid_request") 

454 self.server = NlpWebViews(request=self.mock_pyramid_request) 

455 self.server._authenticate = mock.Mock() 

456 self.server._set_body_json_from_request = mock.Mock( 

457 name="mock_set_body_json_from_request" 

458 ) 

459 

460 # Rather than modify the instances, let's try to modify the class. This 

461 # is because process_now() does its own instance creation. 

462 # (CloudRequest is the base class of CloudRequestProcess and 

463 # CloudRequestListProcessors.) 

464 CloudRequest._post_get_json = self._get_server_response 

465 

466 # Client side #1 

467 self.mock_nlpdef = mock.Mock() 

468 self.mock_nlpdef.name = "testdef" 

469 self.listprocclient = CloudRequestListProcessors( 

470 nlpdef=self.mock_nlpdef 

471 ) 

472 

473 # Client side #2 

474 os.environ[NLP_CONFIG_ENV_VAR] = str(self.configfilepath.absolute()) 

475 self.nlpdef = NlpDefinition(self.nlpdefname) # loads the config 

476 self.crinfo = CloudRunInfo(nlpdef=self.nlpdef) 

477 

478 def _mk_nlp_config(self) -> str: 

479 """ 

480 Returns a test NLP config file. 

481 """ 

482 return f""" 

483# NLP definitions 

484 

485[{NlpConfigPrefixes.NLPDEF}:{self.nlpdefname}] 

486{NlpDefConfigKeys.INPUTFIELDDEFS} = 

487 {self.inputname} 

488{NlpDefConfigKeys.PROCESSORS} = 

489 {self.cloudclassname} {self.cloudproc_crp} 

490 {self.cloudclassname} {self.cloudproc_alcohol} 

491{NlpDefConfigKeys.PROGRESSDB} = {self.dbsectionname} 

492{NlpDefConfigKeys.HASHPHRASE} = blah 

493{NlpDefConfigKeys.CLOUD_CONFIG} = {self.cloudconfigname} 

494{NlpDefConfigKeys.CLOUD_REQUEST_DATA_DIR} = {self.tempdir.name} 

495 

496# Inputs 

497 

498[{NlpConfigPrefixes.INPUT}:{self.inputname}] 

499{InputFieldConfigKeys.SRCDB} = {self.dbsectionname} 

500{InputFieldConfigKeys.SRCTABLE} = {self.txttable} 

501{InputFieldConfigKeys.SRCPKFIELD} = {self.pkcol} 

502{InputFieldConfigKeys.SRCFIELD} = {self.txtcol} 

503 

504# Processors 

505 

506# - CRP 

507[{NlpConfigPrefixes.PROCESSOR}:{self.cloudproc_crp}] 

508{ProcessorConfigKeys.PROCESSOR_NAME} = crate_anon.nlp_manager.parse_biochemistry.Crp 

509{ProcessorConfigKeys.PROCESSOR_FORMAT} = {NlpDefValues.FORMAT_STANDARD} 

510{ProcessorConfigKeys.OUTPUTTYPEMAP} = 

511 crp {self.output_crp} 

512{ProcessorConfigKeys.DESTDB} = {self.dbsectionname} 

513 

514# - Alcohol units 

515[{NlpConfigPrefixes.PROCESSOR}:{self.cloudproc_alcohol}] 

516{ProcessorConfigKeys.PROCESSOR_NAME} = crate_anon.nlp_manager.parse_substance_misuse.AlcoholUnits 

517{ProcessorConfigKeys.PROCESSOR_FORMAT} = {NlpDefValues.FORMAT_STANDARD} 

518{ProcessorConfigKeys.OUTPUTTYPEMAP} = 

519 alcoholunits {self.output_alcohol} 

520{ProcessorConfigKeys.DESTDB} = {self.dbsectionname} 

521 

522# Output sections 

523 

524# - CRP 

525[{NlpConfigPrefixes.OUTPUT}:{self.output_crp}] 

526{NlpOutputConfigKeys.DESTTABLE} = nlp_crp 

527 

528# - Alcohol units 

529[{NlpConfigPrefixes.OUTPUT}:{self.output_alcohol}] 

530{NlpOutputConfigKeys.DESTTABLE} = nlp_alcohol 

531 

532# Databases 

533 

534[{NlpConfigPrefixes.DATABASE}:{self.dbsectionname}] 

535{DatabaseConfigKeys.URL} = {self.dburl} 

536{DatabaseConfigKeys.ECHO} = {self.echo} 

537 

538# Cloud servers 

539 

540[{NlpConfigPrefixes.CLOUD}:{self.cloudconfigname}] 

541{CloudNlpConfigKeys.CLOUD_URL} = https://dummy_url 

542 

543""" # noqa: E501 

544 

545 def _mk_test_data(self) -> None: 

546 """ 

547 Inserts some test data into a table. 

548 """ 

549 texts = ["Current CRP 7. Teetotal. Non-smoker."] 

550 engine = create_engine(self.dburl, future=True) 

551 sql_create = text( 

552 f""" 

553 CREATE TABLE {self.txttable} ( 

554 {self.pkcol} INTEGER, 

555 {self.txtcol} TEXT 

556 ) 

557 """ 

558 ) 

559 sql_insert = text( 

560 f""" 

561 INSERT INTO {self.txttable} 

562 ({self.pkcol}, {self.txtcol}) 

563 VALUES(:a, :b) 

564 """ 

565 ) 

566 with engine.begin() as con: 

567 con.execute(sql_create) 

568 for i, txt in enumerate(texts, start=1): 

569 con.execute(sql_insert, dict(a=i, b=txt)) 

570 

571 # noinspection PyUnusedLocal 

572 def _get_server_response( 

573 self, request_json_str: str, may_fail: bool = None 

574 ) -> Dict[str, Any]: 

575 """ 

576 Take a JSON request that has come from our mock client (in string 

577 form), and return a JSON response from our mock server (in dictionary 

578 form). 

579 """ 

580 request_json = json.loads(request_json_str) 

581 log.debug(f"{request_json=}") 

582 self.server.body = request_json 

583 response_json = self.server.index() 

584 log.debug(f"-> {response_json=}") 

585 return response_json 

586 

587 def test_get_remote_processor_columns(self) -> None: 

588 """ 

589 Check that a client can request processor definitions from the server 

590 (testing both ends, just simplifying some communication between them, 

591 e.g. removing authentication). Check that the client can synthesise 

592 SQLAlchemy Column objects from the results. 

593 """ 

594 # Fetch the data 

595 processors = self.listprocclient.get_remote_processors() 

596 # Check it 

597 self.assertIsInstance(processors, list) 

598 for sp in processors: 

599 self.assertIsInstance(sp, ServerProcessor) 

600 log.debug(f"+++ Trying {sp.name=}") 

601 

602 # We won't go so far as to set up a mock database in full. But 

603 # check that Column object are created. 

604 # (a) setup 

605 c = Cloud(nlpdef=None, cfg_processor_name=None) 

606 c.procname = sp.name 

607 c._destdb = mock.Mock(name="mock_destdb") 

608 c._destdb.engine = self.dummy_engine 

609 c.set_procinfo_if_correct(sp) 

610 log.debug(f"--- {c.schema=}") 

611 for tablename in c.schema.keys(): 

612 ouc = mock.Mock(name=f"mock_ouc_{tablename}") 

613 ouc.get_columns = lambda _engine: [] 

614 ouc.renames = {} 

615 ouc.dest_tablename = tablename # usually a property 

616 ouc.destfields = [] 

617 # ... simulating OutputUserConfig 

618 c._outputtypemap[tablename] = ouc 

619 c._type_to_tablename[tablename] = tablename 

620 # (b) test 

621 self.assertTrue(c.is_tabular()) 

622 table_columns = c.dest_tables_columns() 

623 self.assertTrue(len(table_columns) > 0) 

624 for tablename, columns in table_columns.items(): 

625 log.debug(f"--- {sp.name=}: {tablename=}; {columns=}") 

626 self.assertTrue(len(columns) > 0) 

627 for col in columns: 

628 self.assertIsInstance(col, Column) 

629 

630 def test_cloud_pipeline_dict_format(self) -> None: 

631 """ 

632 Test the full pipeline: 

633 

634 - create a source database (in setUp); 

635 - build a config file (in setUp); 

636 - create destination tables, based on remote processor definitions 

637 using tabular_schema; 

638 - run data through cloud NLP, and insert results. 

639 

640 """ 

641 prev = crate_anon.nlp_webserver.tasks.USE_DICT_FORMAT_NLPRP_RESULT 

642 crate_anon.nlp_webserver.tasks.USE_DICT_FORMAT_NLPRP_RESULT = True 

643 drop_remake(nlpdef=self.nlpdef) 

644 process_cloud_now(crinfo=self.crinfo) 

645 # The test is (currently) that it doesn't crash. 

646 # Reset for other tests: 

647 crate_anon.nlp_webserver.tasks.USE_DICT_FORMAT_NLPRP_RESULT = prev 

648 

649 # To explore the database manually: import pdb; pdb.set_trace() 

650 

651 def test_cloud_pipeline_list_format(self) -> None: 

652 """ 

653 Test the full pipeline: 

654 

655 - create a source database (in setUp); 

656 - build a config file (in setUp); 

657 - create destination tables, based on remote processor definitions 

658 using tabular_schema; 

659 - run data through cloud NLP, and insert results. 

660 

661 """ 

662 prev = crate_anon.nlp_webserver.tasks.USE_DICT_FORMAT_NLPRP_RESULT 

663 crate_anon.nlp_webserver.tasks.USE_DICT_FORMAT_NLPRP_RESULT = False 

664 drop_remake(nlpdef=self.nlpdef) 

665 process_cloud_now(crinfo=self.crinfo) 

666 # Reset for other tests: 

667 crate_anon.nlp_webserver.tasks.USE_DICT_FORMAT_NLPRP_RESULT = prev