Coverage for nlp_webserver/views.py: 48%
290 statements
« prev ^ index » next coverage.py v7.8.0, created at 2026-01-08 09:25 -0600
« prev ^ index » next coverage.py v7.8.0, created at 2026-01-08 09:25 -0600
1r"""
2crate_anon/nlp_webserver/views.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26Pyramid views making up the CRATE NLPRP web server.
28"""
30from contextlib import contextmanager
31import datetime
32import logging
33import json
34from typing import Dict, Generator, List, Optional, Tuple, Any
35import redis
37from cardinal_pythonlib.httpconst import HttpStatus
38from cardinal_pythonlib.json_utils.typing_helpers import (
39 JsonArrayType,
40 JsonObjectType,
41 JsonValueType,
42)
43from cardinal_pythonlib.sqlalchemy.core_query import fetch_all_first_values
44from celery.result import AsyncResult, ResultSet
45from pyramid.view import view_config, view_defaults
46from pyramid.request import Request
47from sqlalchemy.exc import SQLAlchemyError
48from sqlalchemy.sql.expression import and_, ClauseElement, select
49import transaction
51from crate_anon.common.constants import JSON_SEPARATORS_COMPACT
52from crate_anon.nlp_webserver.security import (
53 check_password,
54 get_auth_credentials,
55 encrypt_password,
56)
58# from crate_anon.common.profiling import do_cprofile
59from crate_anon.nlprp.api import (
60 json_get_array,
61 json_get_array_of_str,
62 json_get_bool,
63 json_get_str,
64 json_get_toplevel_args,
65 json_get_value,
66 pendulum_to_nlprp_datetime,
67)
68from crate_anon.nlprp.constants import (
69 NlprpCommands,
70 NlprpKeys,
71 NlprpValues,
72)
73from crate_anon.nlprp.errors import (
74 BAD_REQUEST,
75 INTERNAL_SERVER_ERROR,
76 key_missing_error,
77 NlprpError,
78 mkerror,
79 NOT_FOUND,
80 UNAUTHORIZED,
81)
82from crate_anon.nlprp.version import NLPRP_VERSION_STRING
83from crate_anon.nlp_webserver.manage_users import get_users
84from crate_anon.nlp_webserver.models import (
85 dbsession,
86 Document,
87 DocProcRequest,
88 make_unique_id,
89)
90from crate_anon.nlp_webserver.server_processor import ServerProcessor
91from crate_anon.nlp_webserver.constants import (
92 SERVER_NAME,
93 SERVER_VERSION,
94 NlpServerConfigKeys,
95)
96from crate_anon.nlp_webserver.tasks import (
97 celery_app,
98 process_nlp_text,
99 process_nlp_text_immediate,
100 TaskSession,
101 start_task_session,
102)
103from crate_anon.nlp_webserver.settings import SETTINGS
105log = logging.getLogger(__name__)
108# =============================================================================
109# Debugging settings
110# =============================================================================
112DEBUG_SHOW_REQUESTS = False
115if DEBUG_SHOW_REQUESTS:
116 log.warning("Debugging options enabled! Turn off for production.")
119# =============================================================================
120# Constants
121# =============================================================================
123COOKIE_SESSION_TOKEN = "session_token"
125DEFAULT_REDIS_HOST = "localhost"
126DEFAULT_REDIS_PORT = 6379 # https://redis.io/topics/quickstart
127DEFAULT_REDIS_DB_NUMBER = 0 # https://redis.io/commands/select
129REDIS_HOST = SETTINGS.get(NlpServerConfigKeys.REDIS_HOST, DEFAULT_REDIS_HOST)
130REDIS_PORT = SETTINGS.get(NlpServerConfigKeys.REDIS_PORT, DEFAULT_REDIS_PORT)
131REDIS_DB_NUMBER = SETTINGS.get(
132 NlpServerConfigKeys.REDIS_DB_NUMBER, DEFAULT_REDIS_DB_NUMBER
133)
134REDIS_PASSWORD = SETTINGS.get(NlpServerConfigKeys.REDIS_PASSWORD, None)
135# If the redis server doesn't require a password, it's fine to pass
136# 'password=None' to StrictRedis.
138REDIS_SESSIONS = redis.StrictRedis(
139 host=REDIS_HOST,
140 port=REDIS_PORT,
141 db=REDIS_DB_NUMBER,
142 password=REDIS_PASSWORD,
143)
145SESSION_TOKEN_EXPIRY_S = 300
148# =============================================================================
149# SQLAlchemy context
150# =============================================================================
153@contextmanager
154def sqla_transaction_commit():
155 try:
156 yield
157 transaction.commit()
158 except SQLAlchemyError as e:
159 log.critical(f"SQLAlchemy error: {e}")
160 dbsession.rollback()
161 raise INTERNAL_SERVER_ERROR
164# =============================================================================
165# NlprpProcessRequest
166# =============================================================================
169class NlprpProcessRequest:
170 """
171 Represents an NLPRP :ref:`process <nlprp_process>` command. Takes the
172 request JSON, and offers efficient views on it.
174 Uses the global :class:`crate_anon.nlp_server.procs.Processors` class to
175 find processors.
176 """
178 def __init__(self, nlprp_request: JsonObjectType) -> None:
179 """
180 Args:
181 nlprp_request: dictionary from the (entire) JSON NLPRP request
183 Raises:
184 :exc:`NlprpError` for malformed requests
185 """
186 self.nlprp_request = nlprp_request
188 args = json_get_toplevel_args(nlprp_request)
190 # The processors being requested. We fetch all of them now, so they
191 # can be iterated through fast for each document.
192 requested_processors = json_get_array(
193 args, NlprpKeys.PROCESSORS, required=True
194 )
195 self.processors = [
196 ServerProcessor.get_processor_nlprp(d)
197 for d in requested_processors
198 ]
200 # Queue?
201 self.queue = json_get_bool(args, NlprpKeys.QUEUE, default=False)
203 # Client job ID
204 self.client_job_id = json_get_str(
205 args, NlprpKeys.CLIENT_JOB_ID, default=""
206 )
208 # Include the source text in the reply?
209 self.include_text = json_get_bool(args, NlprpKeys.INCLUDE_TEXT)
211 # Content: list of objects (each with text and metadata)
212 self.content = json_get_array(args, NlprpKeys.CONTENT, required=True)
214 def processor_ids(self) -> List[str]:
215 """
216 Return the IDs of all processors.
217 """
218 return [p.processor_id for p in self.processors]
220 def processor_ids_jsonstr(self) -> str:
221 """
222 Returns the IDs of all processors as a string of JSON-encoded IDs.
223 """
224 return json.dumps(
225 self.processor_ids(), separators=JSON_SEPARATORS_COMPACT
226 )
228 def gen_text_metadataobj(
229 self,
230 ) -> Generator[Tuple[str, JsonValueType], None, None]:
231 """
232 Generates text and metadata pairs from the request, with the metadata
233 in JSON object (Python dictionary) format.
235 Yields:
236 tuple: ``(text, metadata)``, as above
237 """
238 for document in self.content:
239 text = json_get_str(document, NlprpKeys.TEXT, required=True)
240 metadata = json_get_value(
241 document, NlprpKeys.METADATA, default=None, required=False
242 )
243 yield text, metadata
245 def gen_text_metadatastr(self) -> Generator[Tuple[str, str], None, None]:
246 """
247 Generates text and metadata pairs from the request, with the metadata
248 in string (serialized JSON) format.
250 Yields:
251 tuple: ``(text, metadata)``, as above
252 """
253 try:
254 for document in self.content:
255 text = json_get_str(document, NlprpKeys.TEXT, required=True)
256 metadata = json_get_value(
257 document, NlprpKeys.METADATA, default=None, required=False
258 )
259 metadata_str = json.dumps(
260 metadata, separators=JSON_SEPARATORS_COMPACT
261 )
262 yield text, metadata_str
263 except KeyError:
264 raise key_missing_error(key=NlprpKeys.TEXT)
267# =============================================================================
268# NlpWebViews
269# =============================================================================
272@view_defaults(renderer="json") # all views can now return JsonObjectType
273class NlpWebViews:
274 """
275 Class to provide HTTP views (via Pyramid) for our NLPRP server.
276 """
278 # -------------------------------------------------------------------------
279 # Constructor
280 # -------------------------------------------------------------------------
282 def __init__(self, request: Request) -> None:
283 """
284 Args:
285 request: a :class:`pyramid.request.Request`
286 """
287 self.request = request
288 # Assign this later so we can return error to client if problem
289 self.body = None # type: Optional[JsonObjectType]
290 # Get username and password
291 self.credentials = get_auth_credentials(self.request)
292 # Assign these later after authentication
293 self.username = None # type: Optional[str]
294 self.password = None # type: Optional[str]
295 # Start database sessions
296 dbsession()
297 start_task_session()
299 # -------------------------------------------------------------------------
300 # Responses and errors
301 # -------------------------------------------------------------------------
303 def set_http_response_status(self, status: int) -> None:
304 """
305 Sets the HTTP status code for our response.
307 Args:
308 status: HTTP status code
309 """
310 self.request.response.status = status
312 def create_response(
313 self, status: int, extra_info: JsonObjectType = None
314 ) -> JsonObjectType:
315 """
316 Returns a JSON HTTP response with some standard information for a given
317 HTTP status and extra information to add to the response.
319 Ensures the HTTP status matches the NLPRP JSON status.
320 """
321 # Put status in HTTP header
322 self.set_http_response_status(status)
323 response_dict = {
324 NlprpKeys.STATUS: status,
325 NlprpKeys.PROTOCOL: {
326 NlprpKeys.NAME: NlprpValues.NLPRP_PROTOCOL_NAME,
327 NlprpKeys.VERSION: NLPRP_VERSION_STRING,
328 },
329 NlprpKeys.SERVER_INFO: {
330 NlprpKeys.NAME: SERVER_NAME,
331 NlprpKeys.VERSION: SERVER_VERSION,
332 },
333 }
334 if extra_info is not None:
335 response_dict.update(extra_info)
336 dbsession.remove()
337 TaskSession.remove()
338 return response_dict
340 def create_error_response(self, error: NlprpError) -> JsonObjectType:
341 """
342 Returns an HTTP response for a given error and description of the error
343 """
344 # Turned 'errors' into array
345 # Should this allow for multiple errors?
346 error_info = {
347 NlprpKeys.ERRORS: [
348 {
349 NlprpKeys.CODE: error.code,
350 NlprpKeys.MESSAGE: error.message,
351 NlprpKeys.DESCRIPTION: error.description,
352 }
353 ]
354 }
355 return self.create_response(error.http_status, error_info)
357 # -------------------------------------------------------------------------
358 # Security
359 # -------------------------------------------------------------------------
361 def check_token(self) -> bool:
362 """
363 Checks to see if the user has given the correct token for the current
364 session connected to their username.
365 """
366 try:
367 redis_token = REDIS_SESSIONS.get(self.username)
368 except redis.exceptions.ConnectionError:
369 log.critical(
370 f"Could not connect to Redis (host={REDIS_HOST!r}, "
371 f"port={REDIS_PORT!r}, password not shown)"
372 )
373 raise
374 if redis_token:
375 redis_token = redis_token.decode()
376 token = self.request.cookies.get(COOKIE_SESSION_TOKEN)
377 if token and token == redis_token:
378 return True
379 else:
380 return False
382 # -------------------------------------------------------------------------
383 # Main view
384 # -------------------------------------------------------------------------
386 # @do_cprofile
387 @view_config(route_name="index")
388 def index(self) -> JsonObjectType:
389 """
390 The top-level "index" view. Passes all the work to
391 :meth:`handle_nlprp_request`, except for error handling.
392 """
393 try:
394 return self.handle_nlprp_request()
395 except NlprpError as error:
396 return self.create_error_response(error)
398 def _authenticate(self) -> None:
399 """
400 Authenticates the user, or raise an :exc:`NlprpError`.
401 """
402 if self.credentials is None:
403 raise mkerror(
404 BAD_REQUEST,
405 "Credentials were absent or not in the correct format",
406 )
407 # See if the user exists
408 users = get_users()
409 self.username = self.credentials.username
410 try:
411 hashed_pw = users[self.username]
412 except KeyError:
413 raise UNAUTHORIZED
414 # Check if password is correct
415 pw = self.credentials.password
416 # pw = 'testpass'
417 if self.check_token():
418 self.password = pw
419 elif check_password(pw, hashed_pw):
420 self.password = pw
421 token = make_unique_id()
422 self.request.response.set_cookie(
423 name=COOKIE_SESSION_TOKEN, value=token
424 )
425 REDIS_SESSIONS.set(self.username, token)
426 REDIS_SESSIONS.expire(self.username, SESSION_TOKEN_EXPIRY_S)
427 else:
428 raise UNAUTHORIZED
430 def _set_body_json_from_request(self) -> None:
431 """
432 Get JSON from request if it is in this form, otherwise raise an
433 :exc:`NlprpError`.
434 """
435 try:
436 body = self.request.json
437 assert isinstance(body, dict)
438 except (json.decoder.JSONDecodeError, AssertionError):
439 raise mkerror(
440 BAD_REQUEST,
441 "Request body was absent or not in JSON object format",
442 )
443 self.body = body # type: JsonObjectType
445 def handle_nlprp_request(self) -> JsonObjectType:
446 """
447 The main function. Authenticates user and checks the request is not
448 malformed, then calls the function relating to the command specified
449 by the user.
450 """
451 self._authenticate()
452 self._set_body_json_from_request()
453 command = json_get_str(self.body, NlprpKeys.COMMAND, required=True)
454 log.debug(
455 f"NLPRP request received from {self.request.remote_addr}: "
456 f"username={self.username}, command={command}"
457 )
458 if DEBUG_SHOW_REQUESTS:
459 log.debug(f"Request: {self.body!r}")
460 return self.parse_command(command)
462 def parse_command(self, command: str) -> JsonObjectType:
463 """
464 Parse the NLPRP command.
465 """
466 if command == NlprpCommands.LIST_PROCESSORS:
467 return self.list_processors()
468 elif command == NlprpCommands.PROCESS:
469 process_request = NlprpProcessRequest(self.body)
470 if process_request.queue:
471 return self.put_in_queue(process_request)
472 else:
473 return self.process_now(process_request)
474 elif command == NlprpCommands.SHOW_QUEUE:
475 return self.show_queue()
476 elif command == NlprpCommands.FETCH_FROM_QUEUE:
477 return self.fetch_from_queue()
478 elif command == NlprpCommands.DELETE_FROM_QUEUE:
479 return self.delete_from_queue()
481 # -------------------------------------------------------------------------
482 # NLPRP command handlers
483 # -------------------------------------------------------------------------
485 def list_processors(self) -> JsonObjectType:
486 """
487 Returns an HTTP response listing the available NLP processors.
488 """
489 return self.create_response(
490 status=HttpStatus.OK,
491 extra_info={
492 NlprpKeys.PROCESSORS: [
493 proc.infodict
494 for proc in ServerProcessor.processors.values()
495 ]
496 },
497 )
499 def process_now(
500 self, process_request: NlprpProcessRequest
501 ) -> JsonObjectType:
502 """
503 Processes the text supplied by the user immediately, without putting
504 it in the queue.
506 Args:
507 process_request: a :class:`NlprpProcessRequest`
508 """
509 results = [] # type: JsonArrayType
510 for text, metadata in process_request.gen_text_metadataobj():
511 processor_data = [] # type: JsonArrayType
512 for processor in process_request.processors:
513 # Send the text off for processing
514 procresult = process_nlp_text_immediate(
515 text=text,
516 processor=processor,
517 username=self.username,
518 password=self.password,
519 )
520 # proc_dict = procresult.nlprp_processor_dict(processor)
521 if procresult[NlprpKeys.NAME] is None:
522 procresult[NlprpKeys.NAME] = processor.name
523 procresult[NlprpKeys.TITLE] = processor.title
524 procresult[NlprpKeys.VERSION] = processor.version
525 processor_data.append(procresult)
527 doc_result = {
528 NlprpKeys.METADATA: metadata,
529 NlprpKeys.PROCESSORS: processor_data,
530 }
531 if process_request.include_text:
532 doc_result[NlprpKeys.TEXT] = text
533 results.append(doc_result)
535 response_info = {
536 NlprpKeys.CLIENT_JOB_ID: process_request.client_job_id,
537 NlprpKeys.RESULTS: results,
538 }
539 return self.create_response(
540 status=HttpStatus.OK, extra_info=response_info
541 )
543 def put_in_queue(
544 self, process_request: NlprpProcessRequest
545 ) -> JsonObjectType:
546 """
547 Puts the document-processor pairs specified by the user into a celery
548 queue to be processed.
550 Args:
551 process_request: a :class:`NlprpProcessRequest`
552 """
553 # Generate unique queue_id for whole client request
554 queue_id = make_unique_id()
556 # Encrypt password using reversible encryption for passing to the
557 # processors.
558 # We must pass the password as a string to the task because it won't
559 # let us pass a bytes object
560 crypt_pass = encrypt_password(self.password).decode()
562 docprocrequest_ids = [] # type: List[str]
563 with transaction.manager: # one COMMIT for everything inside this
564 # Iterate through documents...
565 for doctext, metadata in process_request.gen_text_metadatastr():
566 doc_id = make_unique_id()
567 # PyCharm doesn't like the "deferred" columns, so:
568 # noinspection PyArgumentList
569 doc = Document(
570 document_id=doc_id,
571 doctext=doctext,
572 client_job_id=process_request.client_job_id,
573 queue_id=queue_id,
574 username=self.username,
575 client_metadata=metadata,
576 include_text=process_request.include_text,
577 )
578 dbsession.add(doc) # add to database
579 # Iterate through processors...
580 for processor in process_request.processors:
581 # The combination of a document and a processor gives us
582 # a docproc.
583 docprocreq_id = make_unique_id()
584 docprocreq = DocProcRequest(
585 docprocrequest_id=docprocreq_id,
586 document_id=doc_id,
587 processor_id=processor.processor_id,
588 )
589 dbsession.add(docprocreq) # add to database
590 docprocrequest_ids.append(docprocreq_id)
592 # Now everything's in the database and committed, we can fire off
593 # back-end jobs:
594 for dpr_id in docprocrequest_ids:
595 process_nlp_text.apply_async(
596 # unlike delay(), this allows us to specify task_id, and
597 # then the Celery task ID is the same as the DocProcRequest
598 # ID.
599 args=(dpr_id,), # docprocrequest_id
600 kwargs=dict(username=self.username, crypt_pass=crypt_pass),
601 task_id=dpr_id, # for Celery
602 )
604 response_info = {NlprpKeys.QUEUE_ID: queue_id}
605 return self.create_response(
606 status=HttpStatus.ACCEPTED, extra_info=response_info
607 )
609 def fetch_from_queue(self) -> JsonObjectType:
610 """
611 Fetches requests for all document-processor pairs for the queue_id
612 supplied by the user (if complete).
613 """
614 # ---------------------------------------------------------------------
615 # Args
616 # ---------------------------------------------------------------------
617 args = json_get_toplevel_args(self.body)
618 queue_id = json_get_str(args, NlprpKeys.QUEUE_ID, required=True)
620 # ---------------------------------------------------------------------
621 # Start with the DocProcRequests, because if some are still busy,
622 # we will return a "busy" response.
623 # ---------------------------------------------------------------------
624 q_dpr = (
625 dbsession.query(DocProcRequest)
626 .join(Document)
627 .filter(Document.username == self.username)
628 .filter(Document.queue_id == queue_id)
629 )
630 q_doc = (
631 dbsession.query(Document)
632 .filter(Document.username == self.username)
633 .filter(Document.queue_id == queue_id)
634 )
635 dprs = list(q_dpr.all()) # type: List[DocProcRequest]
636 if not dprs:
637 raise mkerror(NOT_FOUND, "The queue_id given was not found")
638 n = len(dprs)
639 n_done = sum(dpr.done for dpr in dprs)
640 busy = n_done < n
641 if busy:
642 return self.create_response(
643 HttpStatus.ACCEPTED,
644 {
645 NlprpKeys.N_DOCPROCS: n,
646 NlprpKeys.N_DOCPROCS_COMPLETED: n_done,
647 },
648 )
650 # ---------------------------------------------------------------------
651 # Make it easy to look up processors
652 # ---------------------------------------------------------------------
654 processor_cache = {} # type: Dict[str, ServerProcessor]
656 def get_processor_cached(_processor_id: str) -> ServerProcessor:
657 """
658 Cache lookups for speed. (All documents will share the same set
659 of processors, so there'll be a fair bit of duplication.)
660 """
661 nonlocal processor_cache
662 try:
663 return processor_cache[_processor_id]
664 except KeyError:
665 _processor = ServerProcessor.get_processor_from_id(
666 _processor_id
667 ) # may raise
668 processor_cache[_processor_id] = _processor
669 return _processor
671 # ---------------------------------------------------------------------
672 # Collect results by document
673 # ---------------------------------------------------------------------
675 doc_results = [] # type: JsonArrayType
676 client_job_id = None # type: Optional[str]
677 docs = set(dpr.document for dpr in dprs)
678 for doc in docs:
679 if client_job_id is None:
680 client_job_id = doc.client_job_id
681 processor_data = [] # type: JsonArrayType
682 # ... data for *all* the processors for this doc
683 for dpr in doc.docprocrequests:
684 procresult = json.loads(dpr.results) # type: Dict[str, Any]
685 if procresult[NlprpKeys.NAME] is None:
686 processor = get_processor_cached(dpr.processor_id)
687 procresult[NlprpKeys.NAME] = processor.name
688 procresult[NlprpKeys.TITLE] = processor.title
689 procresult[NlprpKeys.VERSION] = processor.version
690 processor_data.append(procresult)
691 metadata = json.loads(doc.client_metadata)
692 doc_result = {
693 NlprpKeys.METADATA: metadata,
694 NlprpKeys.PROCESSORS: processor_data,
695 }
696 if doc.include_text:
697 doc_result[NlprpKeys.TEXT] = doc.doctext
698 doc_results.append(doc_result)
700 # ---------------------------------------------------------------------
701 # Delete leftovers
702 # ---------------------------------------------------------------------
704 with sqla_transaction_commit():
705 q_doc.delete(synchronize_session=False)
706 # ... will also delete the DocProcRequests via a cascade
708 response_info = {
709 NlprpKeys.CLIENT_JOB_ID: (
710 client_job_id if client_job_id is not None else ""
711 ),
712 NlprpKeys.RESULTS: doc_results,
713 }
714 return self.create_response(
715 status=HttpStatus.OK, extra_info=response_info
716 )
718 # @do_cprofile
719 def show_queue(self) -> JsonObjectType:
720 """
721 Finds the queue entries associated with the client, optionally
722 restricted to one client job id.
723 """
724 args = json_get_toplevel_args(self.body, required=False)
725 if args:
726 client_job_id = json_get_str(
727 args, NlprpKeys.CLIENT_JOB_ID, default="", required=False
728 )
729 else:
730 client_job_id = ""
732 # Queue IDs that are of interest
733 queue_id_wheres = [
734 Document.username == self.username
735 ] # type: List[ClauseElement]
736 if client_job_id:
737 queue_id_wheres.append(Document.client_job_id == client_job_id)
738 # noinspection PyUnresolvedReferences
739 queue_ids = fetch_all_first_values(
740 dbsession,
741 select(Document.queue_id)
742 .where(and_(*queue_id_wheres))
743 .distinct()
744 .order_by(Document.queue_id),
745 ) # type: List[str]
747 queue_answer = [] # type: JsonArrayType
748 for queue_id in queue_ids:
749 # DocProcRequest objects that are of interest
750 dprs = list(
751 dbsession.query(DocProcRequest)
752 .join(Document)
753 .filter(Document.queue_id == queue_id)
754 .all()
755 ) # type: List[DocProcRequest]
756 busy = not all([dpr.done for dpr in dprs])
757 if busy:
758 max_time = datetime.datetime.min
759 else:
760 max_time = max([dpr.when_done_utc for dpr in dprs])
761 assert dprs, "No DocProcRequests found; bug?"
762 dt_submitted = dprs[0].document.datetime_submitted_pendulum
764 queue_answer.append(
765 {
766 NlprpKeys.QUEUE_ID: queue_id,
767 NlprpKeys.CLIENT_JOB_ID: client_job_id,
768 NlprpKeys.STATUS: (
769 NlprpValues.BUSY if busy else NlprpValues.READY
770 ),
771 NlprpKeys.DATETIME_SUBMITTED: pendulum_to_nlprp_datetime(
772 dt_submitted, to_utc=True
773 ),
774 NlprpKeys.DATETIME_COMPLETED: (
775 None
776 if busy
777 else pendulum_to_nlprp_datetime(max_time, to_utc=True)
778 ),
779 }
780 )
781 return self.create_response(
782 status=HttpStatus.OK, extra_info={NlprpKeys.QUEUE: queue_answer}
783 )
785 def delete_from_queue(self) -> JsonObjectType:
786 """
787 Deletes from the queue all entries specified by the client.
788 """
789 args = json_get_toplevel_args(self.body)
790 delete_all = json_get_bool(args, NlprpKeys.DELETE_ALL, default=False)
791 client_job_ids = json_get_array_of_str(args, NlprpKeys.CLIENT_JOB_IDS)
793 # Establish what to cancel/delete
794 q_dpr = (
795 dbsession.query(DocProcRequest)
796 .join(Document)
797 .filter(Document.username == self.username)
798 )
799 if not delete_all:
800 q_dpr = q_dpr.filter(Document.client_job_id.in_(client_job_ids))
802 # Remove from Celery queue (cancel ongoing jobs)
803 task_ids_to_cancel = [dpr.docprocrequest_id for dpr in q_dpr.all()]
804 # Quicker to use ResultSet than forget them all separately
805 results = [] # type: List[AsyncResult]
806 for task_id in task_ids_to_cancel:
807 results.append(AsyncResult(id=task_id, app=celery_app))
808 res_set = ResultSet(results=results, app=celery_app)
809 log.debug("About to revoke jobs...")
810 res_set.revoke() # will hang if backend not operational
811 log.debug("... jobs revoked.")
813 q_docs = dbsession.query(Document).filter(
814 Document.username == self.username
815 )
816 if not delete_all:
817 q_docs = q_docs.filter(Document.client_job_id.in_(client_job_ids))
819 with sqla_transaction_commit():
820 # Delete the Document objects, which will cascade-delete the
821 # DocProcRequest objects.
822 q_docs.delete(synchronize_session=False)
824 # Return response
825 return self.create_response(status=HttpStatus.OK)