Coverage for nlp_webserver/views.py: 48%

290 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2026-01-08 09:25 -0600

1r""" 

2crate_anon/nlp_webserver/views.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26Pyramid views making up the CRATE NLPRP web server. 

27 

28""" 

29 

30from contextlib import contextmanager 

31import datetime 

32import logging 

33import json 

34from typing import Dict, Generator, List, Optional, Tuple, Any 

35import redis 

36 

37from cardinal_pythonlib.httpconst import HttpStatus 

38from cardinal_pythonlib.json_utils.typing_helpers import ( 

39 JsonArrayType, 

40 JsonObjectType, 

41 JsonValueType, 

42) 

43from cardinal_pythonlib.sqlalchemy.core_query import fetch_all_first_values 

44from celery.result import AsyncResult, ResultSet 

45from pyramid.view import view_config, view_defaults 

46from pyramid.request import Request 

47from sqlalchemy.exc import SQLAlchemyError 

48from sqlalchemy.sql.expression import and_, ClauseElement, select 

49import transaction 

50 

51from crate_anon.common.constants import JSON_SEPARATORS_COMPACT 

52from crate_anon.nlp_webserver.security import ( 

53 check_password, 

54 get_auth_credentials, 

55 encrypt_password, 

56) 

57 

58# from crate_anon.common.profiling import do_cprofile 

59from crate_anon.nlprp.api import ( 

60 json_get_array, 

61 json_get_array_of_str, 

62 json_get_bool, 

63 json_get_str, 

64 json_get_toplevel_args, 

65 json_get_value, 

66 pendulum_to_nlprp_datetime, 

67) 

68from crate_anon.nlprp.constants import ( 

69 NlprpCommands, 

70 NlprpKeys, 

71 NlprpValues, 

72) 

73from crate_anon.nlprp.errors import ( 

74 BAD_REQUEST, 

75 INTERNAL_SERVER_ERROR, 

76 key_missing_error, 

77 NlprpError, 

78 mkerror, 

79 NOT_FOUND, 

80 UNAUTHORIZED, 

81) 

82from crate_anon.nlprp.version import NLPRP_VERSION_STRING 

83from crate_anon.nlp_webserver.manage_users import get_users 

84from crate_anon.nlp_webserver.models import ( 

85 dbsession, 

86 Document, 

87 DocProcRequest, 

88 make_unique_id, 

89) 

90from crate_anon.nlp_webserver.server_processor import ServerProcessor 

91from crate_anon.nlp_webserver.constants import ( 

92 SERVER_NAME, 

93 SERVER_VERSION, 

94 NlpServerConfigKeys, 

95) 

96from crate_anon.nlp_webserver.tasks import ( 

97 celery_app, 

98 process_nlp_text, 

99 process_nlp_text_immediate, 

100 TaskSession, 

101 start_task_session, 

102) 

103from crate_anon.nlp_webserver.settings import SETTINGS 

104 

105log = logging.getLogger(__name__) 

106 

107 

108# ============================================================================= 

109# Debugging settings 

110# ============================================================================= 

111 

112DEBUG_SHOW_REQUESTS = False 

113 

114 

115if DEBUG_SHOW_REQUESTS: 

116 log.warning("Debugging options enabled! Turn off for production.") 

117 

118 

119# ============================================================================= 

120# Constants 

121# ============================================================================= 

122 

123COOKIE_SESSION_TOKEN = "session_token" 

124 

125DEFAULT_REDIS_HOST = "localhost" 

126DEFAULT_REDIS_PORT = 6379 # https://redis.io/topics/quickstart 

127DEFAULT_REDIS_DB_NUMBER = 0 # https://redis.io/commands/select 

128 

129REDIS_HOST = SETTINGS.get(NlpServerConfigKeys.REDIS_HOST, DEFAULT_REDIS_HOST) 

130REDIS_PORT = SETTINGS.get(NlpServerConfigKeys.REDIS_PORT, DEFAULT_REDIS_PORT) 

131REDIS_DB_NUMBER = SETTINGS.get( 

132 NlpServerConfigKeys.REDIS_DB_NUMBER, DEFAULT_REDIS_DB_NUMBER 

133) 

134REDIS_PASSWORD = SETTINGS.get(NlpServerConfigKeys.REDIS_PASSWORD, None) 

135# If the redis server doesn't require a password, it's fine to pass 

136# 'password=None' to StrictRedis. 

137 

138REDIS_SESSIONS = redis.StrictRedis( 

139 host=REDIS_HOST, 

140 port=REDIS_PORT, 

141 db=REDIS_DB_NUMBER, 

142 password=REDIS_PASSWORD, 

143) 

144 

145SESSION_TOKEN_EXPIRY_S = 300 

146 

147 

148# ============================================================================= 

149# SQLAlchemy context 

150# ============================================================================= 

151 

152 

153@contextmanager 

154def sqla_transaction_commit(): 

155 try: 

156 yield 

157 transaction.commit() 

158 except SQLAlchemyError as e: 

159 log.critical(f"SQLAlchemy error: {e}") 

160 dbsession.rollback() 

161 raise INTERNAL_SERVER_ERROR 

162 

163 

164# ============================================================================= 

165# NlprpProcessRequest 

166# ============================================================================= 

167 

168 

169class NlprpProcessRequest: 

170 """ 

171 Represents an NLPRP :ref:`process <nlprp_process>` command. Takes the 

172 request JSON, and offers efficient views on it. 

173 

174 Uses the global :class:`crate_anon.nlp_server.procs.Processors` class to 

175 find processors. 

176 """ 

177 

178 def __init__(self, nlprp_request: JsonObjectType) -> None: 

179 """ 

180 Args: 

181 nlprp_request: dictionary from the (entire) JSON NLPRP request 

182 

183 Raises: 

184 :exc:`NlprpError` for malformed requests 

185 """ 

186 self.nlprp_request = nlprp_request 

187 

188 args = json_get_toplevel_args(nlprp_request) 

189 

190 # The processors being requested. We fetch all of them now, so they 

191 # can be iterated through fast for each document. 

192 requested_processors = json_get_array( 

193 args, NlprpKeys.PROCESSORS, required=True 

194 ) 

195 self.processors = [ 

196 ServerProcessor.get_processor_nlprp(d) 

197 for d in requested_processors 

198 ] 

199 

200 # Queue? 

201 self.queue = json_get_bool(args, NlprpKeys.QUEUE, default=False) 

202 

203 # Client job ID 

204 self.client_job_id = json_get_str( 

205 args, NlprpKeys.CLIENT_JOB_ID, default="" 

206 ) 

207 

208 # Include the source text in the reply? 

209 self.include_text = json_get_bool(args, NlprpKeys.INCLUDE_TEXT) 

210 

211 # Content: list of objects (each with text and metadata) 

212 self.content = json_get_array(args, NlprpKeys.CONTENT, required=True) 

213 

214 def processor_ids(self) -> List[str]: 

215 """ 

216 Return the IDs of all processors. 

217 """ 

218 return [p.processor_id for p in self.processors] 

219 

220 def processor_ids_jsonstr(self) -> str: 

221 """ 

222 Returns the IDs of all processors as a string of JSON-encoded IDs. 

223 """ 

224 return json.dumps( 

225 self.processor_ids(), separators=JSON_SEPARATORS_COMPACT 

226 ) 

227 

228 def gen_text_metadataobj( 

229 self, 

230 ) -> Generator[Tuple[str, JsonValueType], None, None]: 

231 """ 

232 Generates text and metadata pairs from the request, with the metadata 

233 in JSON object (Python dictionary) format. 

234 

235 Yields: 

236 tuple: ``(text, metadata)``, as above 

237 """ 

238 for document in self.content: 

239 text = json_get_str(document, NlprpKeys.TEXT, required=True) 

240 metadata = json_get_value( 

241 document, NlprpKeys.METADATA, default=None, required=False 

242 ) 

243 yield text, metadata 

244 

245 def gen_text_metadatastr(self) -> Generator[Tuple[str, str], None, None]: 

246 """ 

247 Generates text and metadata pairs from the request, with the metadata 

248 in string (serialized JSON) format. 

249 

250 Yields: 

251 tuple: ``(text, metadata)``, as above 

252 """ 

253 try: 

254 for document in self.content: 

255 text = json_get_str(document, NlprpKeys.TEXT, required=True) 

256 metadata = json_get_value( 

257 document, NlprpKeys.METADATA, default=None, required=False 

258 ) 

259 metadata_str = json.dumps( 

260 metadata, separators=JSON_SEPARATORS_COMPACT 

261 ) 

262 yield text, metadata_str 

263 except KeyError: 

264 raise key_missing_error(key=NlprpKeys.TEXT) 

265 

266 

267# ============================================================================= 

268# NlpWebViews 

269# ============================================================================= 

270 

271 

272@view_defaults(renderer="json") # all views can now return JsonObjectType 

273class NlpWebViews: 

274 """ 

275 Class to provide HTTP views (via Pyramid) for our NLPRP server. 

276 """ 

277 

278 # ------------------------------------------------------------------------- 

279 # Constructor 

280 # ------------------------------------------------------------------------- 

281 

282 def __init__(self, request: Request) -> None: 

283 """ 

284 Args: 

285 request: a :class:`pyramid.request.Request` 

286 """ 

287 self.request = request 

288 # Assign this later so we can return error to client if problem 

289 self.body = None # type: Optional[JsonObjectType] 

290 # Get username and password 

291 self.credentials = get_auth_credentials(self.request) 

292 # Assign these later after authentication 

293 self.username = None # type: Optional[str] 

294 self.password = None # type: Optional[str] 

295 # Start database sessions 

296 dbsession() 

297 start_task_session() 

298 

299 # ------------------------------------------------------------------------- 

300 # Responses and errors 

301 # ------------------------------------------------------------------------- 

302 

303 def set_http_response_status(self, status: int) -> None: 

304 """ 

305 Sets the HTTP status code for our response. 

306 

307 Args: 

308 status: HTTP status code 

309 """ 

310 self.request.response.status = status 

311 

312 def create_response( 

313 self, status: int, extra_info: JsonObjectType = None 

314 ) -> JsonObjectType: 

315 """ 

316 Returns a JSON HTTP response with some standard information for a given 

317 HTTP status and extra information to add to the response. 

318 

319 Ensures the HTTP status matches the NLPRP JSON status. 

320 """ 

321 # Put status in HTTP header 

322 self.set_http_response_status(status) 

323 response_dict = { 

324 NlprpKeys.STATUS: status, 

325 NlprpKeys.PROTOCOL: { 

326 NlprpKeys.NAME: NlprpValues.NLPRP_PROTOCOL_NAME, 

327 NlprpKeys.VERSION: NLPRP_VERSION_STRING, 

328 }, 

329 NlprpKeys.SERVER_INFO: { 

330 NlprpKeys.NAME: SERVER_NAME, 

331 NlprpKeys.VERSION: SERVER_VERSION, 

332 }, 

333 } 

334 if extra_info is not None: 

335 response_dict.update(extra_info) 

336 dbsession.remove() 

337 TaskSession.remove() 

338 return response_dict 

339 

340 def create_error_response(self, error: NlprpError) -> JsonObjectType: 

341 """ 

342 Returns an HTTP response for a given error and description of the error 

343 """ 

344 # Turned 'errors' into array 

345 # Should this allow for multiple errors? 

346 error_info = { 

347 NlprpKeys.ERRORS: [ 

348 { 

349 NlprpKeys.CODE: error.code, 

350 NlprpKeys.MESSAGE: error.message, 

351 NlprpKeys.DESCRIPTION: error.description, 

352 } 

353 ] 

354 } 

355 return self.create_response(error.http_status, error_info) 

356 

357 # ------------------------------------------------------------------------- 

358 # Security 

359 # ------------------------------------------------------------------------- 

360 

361 def check_token(self) -> bool: 

362 """ 

363 Checks to see if the user has given the correct token for the current 

364 session connected to their username. 

365 """ 

366 try: 

367 redis_token = REDIS_SESSIONS.get(self.username) 

368 except redis.exceptions.ConnectionError: 

369 log.critical( 

370 f"Could not connect to Redis (host={REDIS_HOST!r}, " 

371 f"port={REDIS_PORT!r}, password not shown)" 

372 ) 

373 raise 

374 if redis_token: 

375 redis_token = redis_token.decode() 

376 token = self.request.cookies.get(COOKIE_SESSION_TOKEN) 

377 if token and token == redis_token: 

378 return True 

379 else: 

380 return False 

381 

382 # ------------------------------------------------------------------------- 

383 # Main view 

384 # ------------------------------------------------------------------------- 

385 

386 # @do_cprofile 

387 @view_config(route_name="index") 

388 def index(self) -> JsonObjectType: 

389 """ 

390 The top-level "index" view. Passes all the work to 

391 :meth:`handle_nlprp_request`, except for error handling. 

392 """ 

393 try: 

394 return self.handle_nlprp_request() 

395 except NlprpError as error: 

396 return self.create_error_response(error) 

397 

398 def _authenticate(self) -> None: 

399 """ 

400 Authenticates the user, or raise an :exc:`NlprpError`. 

401 """ 

402 if self.credentials is None: 

403 raise mkerror( 

404 BAD_REQUEST, 

405 "Credentials were absent or not in the correct format", 

406 ) 

407 # See if the user exists 

408 users = get_users() 

409 self.username = self.credentials.username 

410 try: 

411 hashed_pw = users[self.username] 

412 except KeyError: 

413 raise UNAUTHORIZED 

414 # Check if password is correct 

415 pw = self.credentials.password 

416 # pw = 'testpass' 

417 if self.check_token(): 

418 self.password = pw 

419 elif check_password(pw, hashed_pw): 

420 self.password = pw 

421 token = make_unique_id() 

422 self.request.response.set_cookie( 

423 name=COOKIE_SESSION_TOKEN, value=token 

424 ) 

425 REDIS_SESSIONS.set(self.username, token) 

426 REDIS_SESSIONS.expire(self.username, SESSION_TOKEN_EXPIRY_S) 

427 else: 

428 raise UNAUTHORIZED 

429 

430 def _set_body_json_from_request(self) -> None: 

431 """ 

432 Get JSON from request if it is in this form, otherwise raise an 

433 :exc:`NlprpError`. 

434 """ 

435 try: 

436 body = self.request.json 

437 assert isinstance(body, dict) 

438 except (json.decoder.JSONDecodeError, AssertionError): 

439 raise mkerror( 

440 BAD_REQUEST, 

441 "Request body was absent or not in JSON object format", 

442 ) 

443 self.body = body # type: JsonObjectType 

444 

445 def handle_nlprp_request(self) -> JsonObjectType: 

446 """ 

447 The main function. Authenticates user and checks the request is not 

448 malformed, then calls the function relating to the command specified 

449 by the user. 

450 """ 

451 self._authenticate() 

452 self._set_body_json_from_request() 

453 command = json_get_str(self.body, NlprpKeys.COMMAND, required=True) 

454 log.debug( 

455 f"NLPRP request received from {self.request.remote_addr}: " 

456 f"username={self.username}, command={command}" 

457 ) 

458 if DEBUG_SHOW_REQUESTS: 

459 log.debug(f"Request: {self.body!r}") 

460 return self.parse_command(command) 

461 

462 def parse_command(self, command: str) -> JsonObjectType: 

463 """ 

464 Parse the NLPRP command. 

465 """ 

466 if command == NlprpCommands.LIST_PROCESSORS: 

467 return self.list_processors() 

468 elif command == NlprpCommands.PROCESS: 

469 process_request = NlprpProcessRequest(self.body) 

470 if process_request.queue: 

471 return self.put_in_queue(process_request) 

472 else: 

473 return self.process_now(process_request) 

474 elif command == NlprpCommands.SHOW_QUEUE: 

475 return self.show_queue() 

476 elif command == NlprpCommands.FETCH_FROM_QUEUE: 

477 return self.fetch_from_queue() 

478 elif command == NlprpCommands.DELETE_FROM_QUEUE: 

479 return self.delete_from_queue() 

480 

481 # ------------------------------------------------------------------------- 

482 # NLPRP command handlers 

483 # ------------------------------------------------------------------------- 

484 

485 def list_processors(self) -> JsonObjectType: 

486 """ 

487 Returns an HTTP response listing the available NLP processors. 

488 """ 

489 return self.create_response( 

490 status=HttpStatus.OK, 

491 extra_info={ 

492 NlprpKeys.PROCESSORS: [ 

493 proc.infodict 

494 for proc in ServerProcessor.processors.values() 

495 ] 

496 }, 

497 ) 

498 

499 def process_now( 

500 self, process_request: NlprpProcessRequest 

501 ) -> JsonObjectType: 

502 """ 

503 Processes the text supplied by the user immediately, without putting 

504 it in the queue. 

505 

506 Args: 

507 process_request: a :class:`NlprpProcessRequest` 

508 """ 

509 results = [] # type: JsonArrayType 

510 for text, metadata in process_request.gen_text_metadataobj(): 

511 processor_data = [] # type: JsonArrayType 

512 for processor in process_request.processors: 

513 # Send the text off for processing 

514 procresult = process_nlp_text_immediate( 

515 text=text, 

516 processor=processor, 

517 username=self.username, 

518 password=self.password, 

519 ) 

520 # proc_dict = procresult.nlprp_processor_dict(processor) 

521 if procresult[NlprpKeys.NAME] is None: 

522 procresult[NlprpKeys.NAME] = processor.name 

523 procresult[NlprpKeys.TITLE] = processor.title 

524 procresult[NlprpKeys.VERSION] = processor.version 

525 processor_data.append(procresult) 

526 

527 doc_result = { 

528 NlprpKeys.METADATA: metadata, 

529 NlprpKeys.PROCESSORS: processor_data, 

530 } 

531 if process_request.include_text: 

532 doc_result[NlprpKeys.TEXT] = text 

533 results.append(doc_result) 

534 

535 response_info = { 

536 NlprpKeys.CLIENT_JOB_ID: process_request.client_job_id, 

537 NlprpKeys.RESULTS: results, 

538 } 

539 return self.create_response( 

540 status=HttpStatus.OK, extra_info=response_info 

541 ) 

542 

543 def put_in_queue( 

544 self, process_request: NlprpProcessRequest 

545 ) -> JsonObjectType: 

546 """ 

547 Puts the document-processor pairs specified by the user into a celery 

548 queue to be processed. 

549 

550 Args: 

551 process_request: a :class:`NlprpProcessRequest` 

552 """ 

553 # Generate unique queue_id for whole client request 

554 queue_id = make_unique_id() 

555 

556 # Encrypt password using reversible encryption for passing to the 

557 # processors. 

558 # We must pass the password as a string to the task because it won't 

559 # let us pass a bytes object 

560 crypt_pass = encrypt_password(self.password).decode() 

561 

562 docprocrequest_ids = [] # type: List[str] 

563 with transaction.manager: # one COMMIT for everything inside this 

564 # Iterate through documents... 

565 for doctext, metadata in process_request.gen_text_metadatastr(): 

566 doc_id = make_unique_id() 

567 # PyCharm doesn't like the "deferred" columns, so: 

568 # noinspection PyArgumentList 

569 doc = Document( 

570 document_id=doc_id, 

571 doctext=doctext, 

572 client_job_id=process_request.client_job_id, 

573 queue_id=queue_id, 

574 username=self.username, 

575 client_metadata=metadata, 

576 include_text=process_request.include_text, 

577 ) 

578 dbsession.add(doc) # add to database 

579 # Iterate through processors... 

580 for processor in process_request.processors: 

581 # The combination of a document and a processor gives us 

582 # a docproc. 

583 docprocreq_id = make_unique_id() 

584 docprocreq = DocProcRequest( 

585 docprocrequest_id=docprocreq_id, 

586 document_id=doc_id, 

587 processor_id=processor.processor_id, 

588 ) 

589 dbsession.add(docprocreq) # add to database 

590 docprocrequest_ids.append(docprocreq_id) 

591 

592 # Now everything's in the database and committed, we can fire off 

593 # back-end jobs: 

594 for dpr_id in docprocrequest_ids: 

595 process_nlp_text.apply_async( 

596 # unlike delay(), this allows us to specify task_id, and 

597 # then the Celery task ID is the same as the DocProcRequest 

598 # ID. 

599 args=(dpr_id,), # docprocrequest_id 

600 kwargs=dict(username=self.username, crypt_pass=crypt_pass), 

601 task_id=dpr_id, # for Celery 

602 ) 

603 

604 response_info = {NlprpKeys.QUEUE_ID: queue_id} 

605 return self.create_response( 

606 status=HttpStatus.ACCEPTED, extra_info=response_info 

607 ) 

608 

609 def fetch_from_queue(self) -> JsonObjectType: 

610 """ 

611 Fetches requests for all document-processor pairs for the queue_id 

612 supplied by the user (if complete). 

613 """ 

614 # --------------------------------------------------------------------- 

615 # Args 

616 # --------------------------------------------------------------------- 

617 args = json_get_toplevel_args(self.body) 

618 queue_id = json_get_str(args, NlprpKeys.QUEUE_ID, required=True) 

619 

620 # --------------------------------------------------------------------- 

621 # Start with the DocProcRequests, because if some are still busy, 

622 # we will return a "busy" response. 

623 # --------------------------------------------------------------------- 

624 q_dpr = ( 

625 dbsession.query(DocProcRequest) 

626 .join(Document) 

627 .filter(Document.username == self.username) 

628 .filter(Document.queue_id == queue_id) 

629 ) 

630 q_doc = ( 

631 dbsession.query(Document) 

632 .filter(Document.username == self.username) 

633 .filter(Document.queue_id == queue_id) 

634 ) 

635 dprs = list(q_dpr.all()) # type: List[DocProcRequest] 

636 if not dprs: 

637 raise mkerror(NOT_FOUND, "The queue_id given was not found") 

638 n = len(dprs) 

639 n_done = sum(dpr.done for dpr in dprs) 

640 busy = n_done < n 

641 if busy: 

642 return self.create_response( 

643 HttpStatus.ACCEPTED, 

644 { 

645 NlprpKeys.N_DOCPROCS: n, 

646 NlprpKeys.N_DOCPROCS_COMPLETED: n_done, 

647 }, 

648 ) 

649 

650 # --------------------------------------------------------------------- 

651 # Make it easy to look up processors 

652 # --------------------------------------------------------------------- 

653 

654 processor_cache = {} # type: Dict[str, ServerProcessor] 

655 

656 def get_processor_cached(_processor_id: str) -> ServerProcessor: 

657 """ 

658 Cache lookups for speed. (All documents will share the same set 

659 of processors, so there'll be a fair bit of duplication.) 

660 """ 

661 nonlocal processor_cache 

662 try: 

663 return processor_cache[_processor_id] 

664 except KeyError: 

665 _processor = ServerProcessor.get_processor_from_id( 

666 _processor_id 

667 ) # may raise 

668 processor_cache[_processor_id] = _processor 

669 return _processor 

670 

671 # --------------------------------------------------------------------- 

672 # Collect results by document 

673 # --------------------------------------------------------------------- 

674 

675 doc_results = [] # type: JsonArrayType 

676 client_job_id = None # type: Optional[str] 

677 docs = set(dpr.document for dpr in dprs) 

678 for doc in docs: 

679 if client_job_id is None: 

680 client_job_id = doc.client_job_id 

681 processor_data = [] # type: JsonArrayType 

682 # ... data for *all* the processors for this doc 

683 for dpr in doc.docprocrequests: 

684 procresult = json.loads(dpr.results) # type: Dict[str, Any] 

685 if procresult[NlprpKeys.NAME] is None: 

686 processor = get_processor_cached(dpr.processor_id) 

687 procresult[NlprpKeys.NAME] = processor.name 

688 procresult[NlprpKeys.TITLE] = processor.title 

689 procresult[NlprpKeys.VERSION] = processor.version 

690 processor_data.append(procresult) 

691 metadata = json.loads(doc.client_metadata) 

692 doc_result = { 

693 NlprpKeys.METADATA: metadata, 

694 NlprpKeys.PROCESSORS: processor_data, 

695 } 

696 if doc.include_text: 

697 doc_result[NlprpKeys.TEXT] = doc.doctext 

698 doc_results.append(doc_result) 

699 

700 # --------------------------------------------------------------------- 

701 # Delete leftovers 

702 # --------------------------------------------------------------------- 

703 

704 with sqla_transaction_commit(): 

705 q_doc.delete(synchronize_session=False) 

706 # ... will also delete the DocProcRequests via a cascade 

707 

708 response_info = { 

709 NlprpKeys.CLIENT_JOB_ID: ( 

710 client_job_id if client_job_id is not None else "" 

711 ), 

712 NlprpKeys.RESULTS: doc_results, 

713 } 

714 return self.create_response( 

715 status=HttpStatus.OK, extra_info=response_info 

716 ) 

717 

718 # @do_cprofile 

719 def show_queue(self) -> JsonObjectType: 

720 """ 

721 Finds the queue entries associated with the client, optionally 

722 restricted to one client job id. 

723 """ 

724 args = json_get_toplevel_args(self.body, required=False) 

725 if args: 

726 client_job_id = json_get_str( 

727 args, NlprpKeys.CLIENT_JOB_ID, default="", required=False 

728 ) 

729 else: 

730 client_job_id = "" 

731 

732 # Queue IDs that are of interest 

733 queue_id_wheres = [ 

734 Document.username == self.username 

735 ] # type: List[ClauseElement] 

736 if client_job_id: 

737 queue_id_wheres.append(Document.client_job_id == client_job_id) 

738 # noinspection PyUnresolvedReferences 

739 queue_ids = fetch_all_first_values( 

740 dbsession, 

741 select(Document.queue_id) 

742 .where(and_(*queue_id_wheres)) 

743 .distinct() 

744 .order_by(Document.queue_id), 

745 ) # type: List[str] 

746 

747 queue_answer = [] # type: JsonArrayType 

748 for queue_id in queue_ids: 

749 # DocProcRequest objects that are of interest 

750 dprs = list( 

751 dbsession.query(DocProcRequest) 

752 .join(Document) 

753 .filter(Document.queue_id == queue_id) 

754 .all() 

755 ) # type: List[DocProcRequest] 

756 busy = not all([dpr.done for dpr in dprs]) 

757 if busy: 

758 max_time = datetime.datetime.min 

759 else: 

760 max_time = max([dpr.when_done_utc for dpr in dprs]) 

761 assert dprs, "No DocProcRequests found; bug?" 

762 dt_submitted = dprs[0].document.datetime_submitted_pendulum 

763 

764 queue_answer.append( 

765 { 

766 NlprpKeys.QUEUE_ID: queue_id, 

767 NlprpKeys.CLIENT_JOB_ID: client_job_id, 

768 NlprpKeys.STATUS: ( 

769 NlprpValues.BUSY if busy else NlprpValues.READY 

770 ), 

771 NlprpKeys.DATETIME_SUBMITTED: pendulum_to_nlprp_datetime( 

772 dt_submitted, to_utc=True 

773 ), 

774 NlprpKeys.DATETIME_COMPLETED: ( 

775 None 

776 if busy 

777 else pendulum_to_nlprp_datetime(max_time, to_utc=True) 

778 ), 

779 } 

780 ) 

781 return self.create_response( 

782 status=HttpStatus.OK, extra_info={NlprpKeys.QUEUE: queue_answer} 

783 ) 

784 

785 def delete_from_queue(self) -> JsonObjectType: 

786 """ 

787 Deletes from the queue all entries specified by the client. 

788 """ 

789 args = json_get_toplevel_args(self.body) 

790 delete_all = json_get_bool(args, NlprpKeys.DELETE_ALL, default=False) 

791 client_job_ids = json_get_array_of_str(args, NlprpKeys.CLIENT_JOB_IDS) 

792 

793 # Establish what to cancel/delete 

794 q_dpr = ( 

795 dbsession.query(DocProcRequest) 

796 .join(Document) 

797 .filter(Document.username == self.username) 

798 ) 

799 if not delete_all: 

800 q_dpr = q_dpr.filter(Document.client_job_id.in_(client_job_ids)) 

801 

802 # Remove from Celery queue (cancel ongoing jobs) 

803 task_ids_to_cancel = [dpr.docprocrequest_id for dpr in q_dpr.all()] 

804 # Quicker to use ResultSet than forget them all separately 

805 results = [] # type: List[AsyncResult] 

806 for task_id in task_ids_to_cancel: 

807 results.append(AsyncResult(id=task_id, app=celery_app)) 

808 res_set = ResultSet(results=results, app=celery_app) 

809 log.debug("About to revoke jobs...") 

810 res_set.revoke() # will hang if backend not operational 

811 log.debug("... jobs revoked.") 

812 

813 q_docs = dbsession.query(Document).filter( 

814 Document.username == self.username 

815 ) 

816 if not delete_all: 

817 q_docs = q_docs.filter(Document.client_job_id.in_(client_job_ids)) 

818 

819 with sqla_transaction_commit(): 

820 # Delete the Document objects, which will cascade-delete the 

821 # DocProcRequest objects. 

822 q_docs.delete(synchronize_session=False) 

823 

824 # Return response 

825 return self.create_response(status=HttpStatus.OK)