Coverage for cc_modules/cc_taskcollection.py : 20%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/cc_modules/cc_taskcollection.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27**Classes to fetch tasks from the database as efficiently as possible.**
29"""
31from collections import OrderedDict
32import datetime
33from enum import Enum
34import logging
35from threading import Thread
36from typing import (Dict, Generator, List, Optional, Tuple, Type,
37 TYPE_CHECKING, Union)
39from cardinal_pythonlib.json.serialize import (
40 register_class_for_json,
41 register_enum_for_json,
42)
43from cardinal_pythonlib.logs import BraceStyleAdapter
44from cardinal_pythonlib.reprfunc import auto_repr, auto_str
45from cardinal_pythonlib.sort import MINTYPE_SINGLETON, MinType
46from kombu.serialization import dumps, loads
47from pendulum import DateTime as Pendulum
48from sqlalchemy.orm import Query
49from sqlalchemy.orm.session import Session as SqlASession
50from sqlalchemy.sql.functions import func
51from sqlalchemy.sql.expression import and_, exists, or_
53from camcops_server.cc_modules.cc_constants import ERA_NOW
54from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
55from camcops_server.cc_modules.cc_task import (
56 tablename_to_task_class_dict,
57 Task,
58)
59from camcops_server.cc_modules.cc_taskfactory import (
60 task_query_restricted_to_permitted_users,
61)
62from camcops_server.cc_modules.cc_taskfilter import TaskFilter
63from camcops_server.cc_modules.cc_taskindex import TaskIndexEntry
65if TYPE_CHECKING:
66 from sqlalchemy.sql.elements import ClauseElement, ColumnElement
67 from camcops_server.cc_modules.cc_request import CamcopsRequest
69log = BraceStyleAdapter(logging.getLogger(__name__))
72# =============================================================================
73# Debugging options
74# =============================================================================
76DEBUG_QUERY_TIMING = False
78if DEBUG_QUERY_TIMING:
79 log.warning("Debugging options enabled!")
82# =============================================================================
83# Sorting helpers
84# =============================================================================
86def task_when_created_sorter(task: Task) \
87 -> Union[Tuple[Pendulum, datetime.datetime], MinType]:
88 """
89 Function to sort tasks by their creation date/time (with upload date/time
90 as a tiebreak for consistent ordering).
91 """
92 # For sorting of tasks
93 created = task.when_created
94 # noinspection PyProtectedMember
95 uploaded = task._when_added_batch_utc
96 return MINTYPE_SINGLETON if created is None else (created, uploaded)
99@register_enum_for_json
100class TaskSortMethod(Enum):
101 """
102 Enum representing ways to sort tasks.
103 """
104 NONE = 0
105 CREATION_DATE_ASC = 1
106 CREATION_DATE_DESC = 2
109def sort_tasks_in_place(tasklist: List[Task],
110 sortmethod: TaskSortMethod) -> None:
111 """
112 Sort a list of tasks, in place, according to ``sortmethod``.
114 Args:
115 tasklist: the list of tasks
116 sortmethod: a :class:`TaskSortMethod` enum
117 """
118 # Sort?
119 if sortmethod == TaskSortMethod.CREATION_DATE_ASC:
120 tasklist.sort(key=task_when_created_sorter)
121 elif sortmethod == TaskSortMethod.CREATION_DATE_DESC:
122 tasklist.sort(key=task_when_created_sorter, reverse=True)
125# =============================================================================
126# Parallel fetch helper
127# =============================================================================
128# - Why consider a parallel fetch?
129# Because a typical fetch might involve 27ms per query (as seen by Python;
130# less as seen by MySQL) but about 100 queries, for a not-very-large
131# database.
132# - Initially UNSUCCESSFUL: even after tweaking pool_size=0 in create_engine()
133# to get round the SQLAlchemy error "QueuePool limit of size 5 overflow 10
134# reached", in the parallel code, a great many queries are launched, but then
135# something goes wrong and others are started but then block -- for ages --
136# waiting for a spare database connection, or something.
137# - Fixed that: I was not explicitly closing the sessions.
138# - But then a major conceptual problem: anything to be lazy-loaded (e.g.
139# patient, but also patient ID, special note, BLOB...) will give this sort of
140# error: "DetachedInstanceError: Parent instance <Phq9 at 0x7fe6cce2d278> is
141# not bound to a Session; lazy load operation of attribute 'patient' cannot
142# proceed" -- for obvious reasons. And some of those operations are only
143# required on the final paginated task set, which requires aggregation across
144# all tasks.
145#
146# HOWEVER, the query time per table drops from ~27ms to 4-8ms if we disable
147# eager loading (lazy="joined") of patients from tasks.
149class FetchThread(Thread):
150 """
151 Thread to fetch tasks in parallel.
153 CURRENTLY UNUSED.
154 """
155 def __init__(self,
156 req: "CamcopsRequest",
157 task_class: Type[Task],
158 factory: "TaskCollection",
159 **kwargs) -> None:
160 self.req = req
161 self.task_class = task_class
162 self.factory = factory
163 self.error = False
164 name = task_class.__tablename__
165 super().__init__(name=name, target=None, **kwargs)
167 def run(self) -> None:
168 log.debug("Thread starting")
169 dbsession = self.req.get_bare_dbsession()
170 # noinspection PyBroadException
171 try:
172 # noinspection PyProtectedMember
173 q = self.factory._make_query(dbsession, self.task_class)
174 if q:
175 tasks = q.all() # type: List[Task]
176 # https://stackoverflow.com/questions/6319207/are-lists-thread-safe # noqa
177 # https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary # noqa
178 # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm # noqa
179 # noinspection PyProtectedMember
180 self.factory._tasks_by_class[self.task_class] = tasks
181 log.debug("Thread finishing with results")
182 else:
183 log.debug("Thread finishing without results")
184 except Exception:
185 self.error = True
186 log.error("Thread error")
187 dbsession.close()
190# =============================================================================
191# Make a set of tasks, deferring work until things are needed
192# =============================================================================
194class TaskCollection(object):
195 """
196 Represent a potential or instantiated call to fetch tasks from the
197 database.
199 The caller may want them in a giant list (e.g. task viewer, CTVs), or split
200 by task class (e.g. trackers).
201 """
202 def __init__(self,
203 req: Optional["CamcopsRequest"],
204 taskfilter: TaskFilter = None,
205 as_dump: bool = False,
206 sort_method_by_class: TaskSortMethod = TaskSortMethod.NONE,
207 sort_method_global: TaskSortMethod = TaskSortMethod.NONE,
208 current_only: bool = True,
209 via_index: bool = True,
210 export_recipient: "ExportRecipient" = None) \
211 -> None:
212 """
213 Args:
214 req:
215 The
216 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`.
217 ``None`` should only be used as a parameter when serializing
218 a :class:`TaskCollection` to the back-end.
219 taskfilter:
220 A :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`
221 object that contains any restrictions we may want to apply.
222 Must be supplied unless supplying ``export_recipient`` (in
223 which case, must not be supplied).
224 as_dump:
225 Use the "dump" permissions rather than the "view" permissions?
226 sort_method_by_class:
227 How should we sort tasks within each task class?
228 sort_method_global:
229 How should we sort tasks overall (across all task types)?
230 current_only:
231 Restrict to ``_current`` tasks only?
232 via_index:
233 Use the server's index (faster)? (Not possible with
234 ``current_only=False``.)
235 export_recipient:
236 A :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
237 """ # noqa
238 if via_index and not current_only:
239 log.warning("Can't use index for non-current tasks")
240 via_index = False
242 self._req = req
243 self._filter = taskfilter
244 self._as_dump = as_dump
245 self._sort_method_by_class = sort_method_by_class
246 self._sort_method_global = sort_method_global
247 self._current_only = current_only
248 self._via_index = via_index
249 self.export_recipient = export_recipient
251 if export_recipient:
252 # We create a new filter to reflect the export recipient.
253 assert self._filter is None, (
254 "Can't supply taskfilter if you supply export_recipient")
255 # We can do lots of what we need with a TaskFilter().
256 self._filter = TaskFilter()
257 if not export_recipient.all_groups:
258 self._filter.group_ids = export_recipient.group_ids
259 self._filter.task_types = export_recipient.tasks
260 self._filter.start_datetime = export_recipient.start_datetime_utc
261 self._filter.end_datetime = export_recipient.end_datetime_utc
262 self._filter.finalized_only = export_recipient.finalized_only
263 self._filter.tasks_with_patient_only = not export_recipient.anonymous_ok() # noqa
264 self._filter.must_have_idnum_type = export_recipient.primary_idnum
265 else:
266 assert self._filter, (
267 "Must supply taskfilter unless you supply export_recipient")
269 self._tasks_by_class = OrderedDict() # type: Dict[Type[Task], List[Task]] # noqa
270 self._all_tasks = None # type: Optional[List[Task]]
271 self._all_indexes = None # type: Optional[Union[List[TaskIndexEntry], Query]] # noqa
273 def __repr__(self) -> str:
274 return auto_repr(self)
276 def __str__(self) -> str:
277 return auto_str(self)
279 # =========================================================================
280 # Interface to read
281 # =========================================================================
283 @property
284 def req(self) -> "CamcopsRequest":
285 """
286 Returns the associated request, or raises :exc:`AssertionError` if it's
287 not been set.
288 """
289 assert self._req is not None, (
290 "Must initialize with a request or call set_request() first"
291 )
292 return self._req
294 def set_request(self, req: "CamcopsRequest") -> None:
295 """
296 Sets the request object manually. Used by Celery back-end tasks.
298 Args:
299 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
300 """
301 self._req = req
303 def task_classes(self) -> List[Type[Task]]:
304 """
305 Return a list of task classes that we want.
306 """
307 return self._filter.task_classes
309 def tasks_for_task_class(self, task_class: Type[Task]) -> List[Task]:
310 """
311 Returns all appropriate task instances for a specific task type.
312 """
313 if self._via_index:
314 self._ensure_everything_fetched_via_index()
315 else:
316 self._fetch_task_class(task_class)
317 tasklist = self._tasks_by_class.get(task_class, [])
318 return tasklist
320 @property
321 def all_tasks(self) -> List[Task]:
322 """
323 Returns a list of all appropriate task instances.
324 """
325 if self._all_tasks is None:
326 if self._via_index:
327 self._ensure_everything_fetched_via_index()
328 else:
329 self._fetch_all_tasks_without_index()
330 return self._all_tasks
332 @property
333 def all_tasks_or_indexes_or_query(self) \
334 -> Union[List[Task], List[TaskIndexEntry], Query]:
335 """
336 Returns a list of all appropriate task instances, or index entries, or
337 a query returning them.
339 - Returning a list of tasks is fine, but the results of this function
340 may be paginated (e.g. in the main task view), so the end result may
341 be that e.g. 20,000 tasks are fetched and 20 are shown.
342 - More efficient is to fetch 20,000 indexes from the single index
343 table, and fetch only the 20 tasks we need.
344 - More efficient still is to fetch the 20 indexes we need, and then
345 their task.
346 """
347 if not self._via_index:
348 return self.all_tasks
350 self._build_index_query() # ensure self._all_indexes is set
352 if self._all_tasks is not None:
353 # The tasks themselves have been fetched.
354 return self._all_tasks
356 return self._all_indexes # indexes or a query to fetch them
358 # def forget_task_class(self, task_class: Type[Task]) -> None:
359 # """
360 # Ditch results for a specific task class (for memory efficiency).
361 # """
362 # self._tasks_by_class.pop(task_class, None)
363 # # The "None" option prevents it from raising KeyError if the key
364 # # doesn't exist.
365 # # https://stackoverflow.com/questions/11277432/how-to-remove-a-key-from-a-python-dictionary # noqa
367 def gen_all_tasks_or_indexes(self) \
368 -> Generator[Union[Task, TaskIndexEntry], None, None]:
369 """
370 Generates tasks or index entries.
371 """
372 tasks_or_indexes_or_query = self.all_tasks_or_indexes_or_query
373 if isinstance(tasks_or_indexes_or_query, Query):
374 for item in tasks_or_indexes_or_query.all():
375 yield item
376 else:
377 for item in tasks_or_indexes_or_query:
378 yield item
380 def gen_tasks_by_class(self) -> Generator[Task, None, None]:
381 """
382 Generates all tasks, class-wise.
383 """
384 for cls in self.task_classes():
385 for task in self.tasks_for_task_class(cls):
386 yield task
388 def gen_tasks_in_global_order(self) -> Generator[Task, None, None]:
389 """
390 Generates all tasks, in the global order.
391 """
392 for task in self.all_tasks:
393 yield task
395 @property
396 def dbsession(self) -> SqlASession:
397 """
398 Returns the request's database session.
399 """
400 return self.req.dbsession
402 # =========================================================================
403 # Internals: fetching Task objects
404 # =========================================================================
406 def _fetch_all_tasks_without_index(self, parallel: bool = False) -> None:
407 """
408 Fetch all tasks from the database.
409 """
411 # AVOID parallel=True; see notes above.
412 if DEBUG_QUERY_TIMING:
413 start_time = Pendulum.now()
415 if parallel:
416 # Deprecated parallel fetch
417 threads = [] # type: List[FetchThread]
418 for task_class in self._filter.task_classes:
419 thread = FetchThread(self.req, task_class, self)
420 thread.start()
421 threads.append(thread)
422 for thread in threads:
423 thread.join()
424 if thread.error:
425 raise ValueError("Multithreaded fetch failed")
427 else:
428 # Fetch all tasks, classwise.
429 for task_class in self._filter.task_classes:
430 self._fetch_task_class(task_class)
432 if DEBUG_QUERY_TIMING:
433 end_time = Pendulum.now()
434 # noinspection PyUnboundLocalVariable
435 time_taken = end_time - start_time
436 log.info("_fetch_all_tasks took {}", time_taken)
438 # Build our joint task list
439 self._all_tasks = [] # type: List[Task]
440 for single_task_list in self._tasks_by_class.values():
441 self._all_tasks += single_task_list
442 sort_tasks_in_place(self._all_tasks, self._sort_method_global)
444 def _fetch_task_class(self, task_class: Type[Task]) -> None:
445 """
446 Fetch tasks from the database for one task type.
447 """
448 if task_class in self._tasks_by_class:
449 return # already fetched
450 q = self._serial_query(task_class)
451 if q is None:
452 newtasks = [] # type: List[Task]
453 else:
454 newtasks = q.all() # type: List[Task]
455 # Apply Python-side filters?
456 newtasks = self._filter_through_python(newtasks)
457 sort_tasks_in_place(newtasks, self._sort_method_by_class)
458 self._tasks_by_class[task_class] = newtasks
460 def _serial_query(self, task_class: Type[Task]) -> Optional[Query]:
461 """
462 Make and return an SQLAlchemy ORM query for a specific task class.
464 Returns ``None`` if no tasks would match our criteria.
465 """
466 dbsession = self.req.dbsession
467 return self._make_query(dbsession, task_class)
469 def _make_query(self, dbsession: SqlASession,
470 task_class: Type[Task]) -> Optional[Query]:
471 """
472 Make and return an SQLAlchemy ORM query for a specific task class.
474 Returns ``None`` if no tasks would match our criteria.
475 """
476 q = dbsession.query(task_class)
478 # Restrict to what the web front end will supply
479 # noinspection PyProtectedMember
480 if self._current_only:
481 # noinspection PyProtectedMember
482 q = q.filter(task_class._current == True) # noqa: E712
484 # Restrict to what is PERMITTED
485 q = task_query_restricted_to_permitted_users(
486 self.req, q, task_class, as_dump=self._as_dump)
488 # Restrict to what is DESIRED
489 if q:
490 q = self._task_query_restricted_by_filter(q, task_class)
491 if q and self.export_recipient:
492 q = self._task_query_restricted_by_export_recipient(q, task_class)
494 return q
496 def _task_query_restricted_by_filter(self,
497 q: Query,
498 cls: Type[Task]) -> Optional[Query]:
499 """
500 Restricts an SQLAlchemy ORM query for a given task class to those
501 tasks that our filter permits.
503 THIS IS A KEY SECURITY FUNCTION, since it implements some permissions
504 that relate to viewing tasks when unfiltered.
506 Args:
507 q: the starting SQLAlchemy ORM Query
508 cls: the task class
510 Returns:
511 the original query, a modified query, or ``None`` if no tasks
512 would pass the filter
514 """
515 tf = self._filter # task filter
516 user = self.req.user
518 if tf.group_ids:
519 permitted_group_ids = tf.group_ids.copy()
520 else:
521 permitted_group_ids = None # unrestricted
523 if tf.dates_inconsistent():
524 return None
526 if cls not in tf.task_classes:
527 # We don't want this task
528 return None
530 if not cls.is_anonymous:
531 # Not anonymous.
532 if not tf.any_specific_patient_filtering():
533 # No patient filtering. Permissions depend on user settings.
534 if user.may_view_all_patients_when_unfiltered:
535 # May see everything. No restrictions.
536 pass
537 elif user.may_view_no_patients_when_unfiltered:
538 # Can't see patient data from any group.
539 # (a) User not permitted to view any patients when
540 # unfiltered, and (b) not filtered to a level that would
541 # reasonably restrict to one or a small number of
542 # patients. Skip the task class.
543 return None
544 else:
545 # May see patient data from some, but not all, groups.
546 liberal_group_ids = user.group_ids_that_nonsuperuser_may_see_when_unfiltered() # noqa
547 if not permitted_group_ids: # was unrestricted
548 permitted_group_ids = liberal_group_ids
549 else: # was restricted; restrict further
550 permitted_group_ids = [
551 gid for gid in permitted_group_ids
552 if gid in liberal_group_ids
553 ]
554 if not permitted_group_ids:
555 return None # down to zero; no point continuing
557 # Patient filtering
558 if tf.any_patient_filtering():
559 # q = q.join(Patient) # fails
560 q = q.join(cls.patient) # use explicitly configured relationship # noqa
561 q = tf.filter_query_by_patient(q, via_index=False)
563 # Patient-independent filtering
565 if tf.device_ids:
566 # noinspection PyProtectedMember
567 q = q.filter(cls._device_id.in_(tf.device_ids))
569 if tf.era:
570 # noinspection PyProtectedMember
571 q = q.filter(cls._era == tf.era)
572 if tf.finalized_only:
573 q = q.filter(cls._era != ERA_NOW)
575 if tf.adding_user_ids:
576 # noinspection PyProtectedMember
577 q = q.filter(cls._adding_user_id.in_(tf.adding_user_ids))
579 if permitted_group_ids:
580 # noinspection PyProtectedMember
581 q = q.filter(cls._group_id.in_(permitted_group_ids))
583 if tf.start_datetime is not None:
584 q = q.filter(cls.when_created >= tf.start_datetime)
585 if tf.end_datetime is not None:
586 q = q.filter(cls.when_created < tf.end_datetime)
588 q = self._filter_query_for_text_contents(q, cls)
590 return q
592 def _task_query_restricted_by_export_recipient(
593 self, q: Query, cls: Type[Task]) -> Optional[Query]:
594 """
595 For exports.
597 Filters via our
598 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`,
599 except for the bits already implemented via our
600 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`.
602 The main job here is for incremental exports: to find tasks that have
603 not yet been exported. We look for any tasks not yet exported to a
604 recipient of the same name (regardless of ``ExportRecipient.id``, which
605 changes when the export recipient is reconfigured).
607 Compare :meth:`_index_query_restricted_by_export_recipient`.
609 Args:
610 q: the starting SQLAlchemy ORM Query
611 cls: the task class
613 Returns:
614 the original query, a modified query, or ``None`` if no tasks
615 would pass the filter
616 """
617 from camcops_server.cc_modules.cc_exportmodels import ExportedTask # delayed import # noqa
619 r = self.export_recipient
620 if not r.is_incremental():
621 # Full database export; no restrictions
622 return q
623 # Otherwise, restrict to tasks not yet sent to this recipient.
624 # noinspection PyUnresolvedReferences
625 q = q.filter(
626 # "There is not a successful export record for this task/recipient"
627 ~exists().select_from(
628 ExportedTask.__table__.join(
629 ExportRecipient.__table__,
630 ExportedTask.recipient_id == ExportRecipient.id
631 )
632 ).where(
633 and_(
634 ExportRecipient.recipient_name == r.recipient_name,
635 ExportedTask.basetable == cls.__tablename__,
636 ExportedTask.task_server_pk == cls._pk,
637 ExportedTask.success == True, # noqa: E712
638 ExportedTask.cancelled == False, # noqa: E712
639 )
640 )
641 )
642 return q
644 def _filter_through_python(self, tasks: List[Task]) -> List[Task]:
645 """
646 Returns those tasks in the list provided that pass any Python-only
647 aspects of our filter (those parts not easily calculable via SQL).
649 This applies to the "direct" (and not "via index") routes only. With
650 the index, we can do everything via SQL.
651 """
652 assert not self._via_index
653 if not self._has_python_parts_to_filter():
654 return tasks
655 return [
656 t for t in tasks
657 if self._task_matches_python_parts_of_filter(t)
658 ]
660 def _has_python_parts_to_filter(self) -> bool:
661 """
662 Does the filter have aspects to it that require some Python thought,
663 not just a database query?
665 Only applicable to the direct (not "via index") route.
666 """
667 assert not self._via_index
668 return self._filter.complete_only
670 def _task_matches_python_parts_of_filter(self, task: Task) -> bool:
671 """
672 Does the task pass the Python parts of the filter?
674 Only applicable to the direct (not "via index") route.
675 """
676 assert not self._via_index
678 # "Is task complete" filter
679 if self._filter.complete_only:
680 if not task.is_complete():
681 return False
683 return True
685 # =========================================================================
686 # Shared between Task and TaskIndexEntry methods
687 # =========================================================================
689 def _filter_query_for_text_contents(
690 self, q: Query, taskclass: Type[Task]) -> Optional[Query]:
691 """
692 Returns the query, filtered for the "text contents" filter.
694 Args:
695 q: the starting SQLAlchemy ORM Query
696 taskclass: the task class
698 Returns:
699 a Query, potentially modified.
700 """
701 tf = self._filter # task filter
703 if not tf.text_contents:
704 return q # unmodified
706 # task must contain ALL the strings in AT LEAST ONE text column
707 textcols = taskclass.get_text_filter_columns()
708 if not textcols:
709 # Text filtering requested, but there are no text columns, so
710 # by definition the filter must fail.
711 return None
712 clauses_over_text_phrases = [] # type: List[ColumnElement]
713 # ... each e.g. "col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%'" # noqa
714 for textfilter in tf.text_contents:
715 tf_lower = textfilter.lower()
716 clauses_over_columns = [] # type: List[ColumnElement]
717 # ... each e.g. "col1 LIKE '%paracetamol%'"
718 for textcol in textcols:
719 # Case-insensitive comparison:
720 # https://groups.google.com/forum/#!topic/sqlalchemy/331XoToT4lk
721 # https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/StringComparisonFilter # noqa
722 clauses_over_columns.append(
723 func.lower(textcol).contains(tf_lower, autoescape=True)
724 )
725 clauses_over_text_phrases.append(
726 or_(*clauses_over_columns)
727 )
728 return q.filter(and_(*clauses_over_text_phrases))
729 # ... thus, e.g.
730 # "(col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%') AND
731 # (col1 LIKE '%overdose%' OR col2 LIKE '%overdose%')
733 # =========================================================================
734 # Internals: fetching TaskIndexEntry objects
735 # =========================================================================
737 def _ensure_everything_fetched_via_index(self) -> None:
738 """
739 Ensure we have all our tasks loaded, using the index.
740 """
741 self._build_index_query()
742 self._fetch_tasks_from_indexes()
744 def _build_index_query(self) -> None:
745 """
746 Creates a Query in :attr:`_all_indexes` that will fetch task indexes.
747 If the task filtering requires the tasks to be fetched (i.e. text
748 contents), fetch the actual tasks too (and filter them).
749 """
750 if self._all_indexes is not None:
751 return
752 self._all_indexes = self._make_index_query()
753 if self._filter.text_contents:
754 self._fetch_tasks_from_indexes()
756 def _fetch_tasks_from_indexes(self) -> None:
757 """
758 Takes the query that has already been stored in :attr:`_all_indexes`,
759 and populate the task attributes, :attr:`_all_tasks` and
760 :attr:`_tasks_by_class`.
761 """
762 if self._all_tasks is not None:
763 return
764 assert self._all_indexes is not None
766 d = tablename_to_task_class_dict()
767 dbsession = self.req.dbsession
768 self._all_tasks = [] # type: List[Task]
770 # Fetch indexes
771 if isinstance(self._all_indexes, Query):
772 # Query built, but indexes not yet fetched.
773 # Replace the query with actual indexes
774 self._all_indexes = self._all_indexes.all() # type: List[TaskIndexEntry] # noqa
775 indexes = self._all_indexes
777 # Fetch tasks
778 tablenames = set(index.task_table_name for index in indexes)
779 for tablename in tablenames:
780 # We do this by task class, so we can execute a single query per
781 # task type (rather than per task).
782 try:
783 taskclass = d[tablename]
784 except KeyError:
785 log.warning("Bad tablename in index: {!r}", tablename)
786 continue
787 tasklist = self._tasks_by_class.setdefault(taskclass, [])
788 task_pks = [i.task_pk for i in indexes if i.tablename == tablename]
789 # noinspection PyProtectedMember
790 qtask = (
791 dbsession.query(taskclass)
792 .filter(taskclass._pk.in_(task_pks))
793 )
794 qtask = self._filter_query_for_text_contents(qtask, taskclass)
795 tasks = qtask.all() # type: List[Task]
796 for task in tasks:
797 tasklist.append(task)
798 self._all_tasks.append(task)
800 # Sort tasks
801 for tasklist in self._tasks_by_class.values():
802 sort_tasks_in_place(tasklist, self._sort_method_by_class)
803 sort_tasks_in_place(self._all_tasks, self._sort_method_global)
805 def _make_index_query(self) -> Optional[Query]:
806 """
807 Make and return an SQLAlchemy ORM query to retrieve indexes.
809 Returns ``None`` if no tasks would match our criteria.
810 """
811 dbsession = self.req.dbsession
812 q = dbsession.query(TaskIndexEntry)
814 # Restrict to what the web front end will supply
815 assert self._current_only, "_current_only must be true to use index"
817 # Restrict to what is PERMITTED
818 if not self.export_recipient:
819 q = task_query_restricted_to_permitted_users(
820 self.req, q, TaskIndexEntry, as_dump=self._as_dump)
822 # Restrict to what is DESIRED
823 if q:
824 q = self._index_query_restricted_by_filter(q)
825 if q and self.export_recipient:
826 q = self._index_query_restricted_by_export_recipient(q)
828 return q
830 def _index_query_restricted_by_filter(self, q: Query) -> Optional[Query]:
831 """
832 Counterpart to :func:`_task_query_restricted_by_filter`, but for
833 indexes.
835 THIS IS A KEY SECURITY FUNCTION, since it implements some permissions
836 that relate to viewing tasks when unfiltered.
838 Args:
839 q: the starting SQLAlchemy ORM Query
841 Returns:
842 the original query, a modified query, or ``None`` if no tasks
843 would pass the filter
845 """
846 tf = self._filter # task filter
847 user = self.req.user
849 if tf.group_ids:
850 permitted_group_ids = tf.group_ids.copy()
851 else:
852 permitted_group_ids = None # unrestricted
854 if tf.dates_inconsistent():
855 return None
857 # Task type filtering
859 if tf.skip_anonymous_tasks():
860 # noinspection PyPep8
861 q = q.filter(TaskIndexEntry.patient_pk != None) # noqa: E711
863 if not tf.offers_all_non_anonymous_task_types():
864 permitted_task_tablenames = [
865 tc.__tablename__ for tc in tf.task_classes]
866 q = q.filter(TaskIndexEntry.task_table_name.in_(
867 permitted_task_tablenames
868 ))
870 # Special rules when we've not filtered for any patients
872 if not tf.any_specific_patient_filtering():
873 # No patient filtering. Permissions depend on user settings.
874 if user.may_view_all_patients_when_unfiltered:
875 # May see everything. No restrictions.
876 pass
877 elif user.may_view_no_patients_when_unfiltered:
878 # Can't see patient data from any group.
879 # (a) User not permitted to view any patients when
880 # unfiltered, and (b) not filtered to a level that would
881 # reasonably restrict to one or a small number of
882 # patients. Restrict to anonymous tasks.
883 # noinspection PyPep8
884 q = q.filter(TaskIndexEntry.patient_pk == None) # noqa: E711
885 else:
886 # May see patient data from some, but not all, groups.
887 # This is a little more complex than the equivalent in
888 # _task_query_restricted_by_filter(), because we shouldn't
889 # restrict anonymous tasks.
890 liberal_group_ids = user.group_ids_that_nonsuperuser_may_see_when_unfiltered() # noqa
891 # noinspection PyPep8
892 liberal_or_anon_criteria = [
893 TaskIndexEntry.patient_pk == None # anonymous OK # noqa: E501,E711
894 ] # type: List[ClauseElement]
895 for gid in liberal_group_ids:
896 liberal_or_anon_criteria.append(
897 TaskIndexEntry.group_id == gid # this group OK
898 )
899 q = q.filter(or_(*liberal_or_anon_criteria))
901 # Patient filtering
903 if tf.any_patient_filtering():
904 q = q.join(TaskIndexEntry.patient) # use relationship
905 q = tf.filter_query_by_patient(q, via_index=True)
907 # Patient-independent filtering
909 if tf.device_ids:
910 # noinspection PyProtectedMember
911 q = q.filter(TaskIndexEntry.device_id.in_(tf.device_ids))
913 if tf.era:
914 # noinspection PyProtectedMember
915 q = q.filter(TaskIndexEntry.era == tf.era)
916 if tf.finalized_only:
917 q = q.filter(TaskIndexEntry.era != ERA_NOW)
919 if tf.adding_user_ids:
920 # noinspection PyProtectedMember
921 q = q.filter(TaskIndexEntry.adding_user_id.in_(tf.adding_user_ids))
923 if permitted_group_ids:
924 # noinspection PyProtectedMember
925 q = q.filter(TaskIndexEntry.group_id.in_(permitted_group_ids))
927 if tf.start_datetime is not None:
928 q = q.filter(TaskIndexEntry.when_created_utc >= tf.start_datetime_utc) # noqa
929 if tf.end_datetime is not None:
930 q = q.filter(TaskIndexEntry.when_created_utc < tf.end_datetime_utc) # noqa
932 # text_contents is managed at the later fetch stage when using indexes
934 # But is_complete can be filtered now and in SQL:
935 if tf.complete_only:
936 # noinspection PyPep8
937 q = q.filter(TaskIndexEntry.task_is_complete == True) # noqa: E712
939 # When we use indexes, we embed the global sort criteria in the query.
940 if self._sort_method_global == TaskSortMethod.CREATION_DATE_ASC:
941 q = q.order_by(TaskIndexEntry.when_created_utc.asc(),
942 TaskIndexEntry.when_added_batch_utc.asc())
943 elif self._sort_method_global == TaskSortMethod.CREATION_DATE_DESC:
944 q = q.order_by(TaskIndexEntry.when_created_utc.desc(),
945 TaskIndexEntry.when_added_batch_utc.desc())
947 return q
949 def _index_query_restricted_by_export_recipient(self, q: Query) \
950 -> Optional[Query]:
951 """
952 For exports.
954 Filters via our
955 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`,
956 except for the bits already implemented via our
957 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`.
959 The main job here is for incremental exports: to find tasks that have
960 not yet been exported.
962 Compare :meth:`_task_query_restricted_by_export_recipient`.
964 Args:
965 q: the starting SQLAlchemy ORM Query
967 Returns:
968 the original query, a modified query, or ``None`` if no tasks
969 would pass the filter
971 """
972 from camcops_server.cc_modules.cc_exportmodels import ExportedTask # delayed import # noqa
974 r = self.export_recipient
975 if not r.is_incremental():
976 # Full database export; no restrictions
977 return q
978 # Otherwise, restrict to tasks not yet sent to this recipient.
979 # Remember: q is a query on TaskIndexEntry.
980 # noinspection PyUnresolvedReferences
981 q = q.filter(
982 # "There is not a successful export record for this task/recipient"
983 ~exists().select_from(
984 ExportedTask.__table__.join(
985 ExportRecipient.__table__,
986 ExportedTask.recipient_id == ExportRecipient.id
987 )
988 ).where(
989 and_(
990 ExportRecipient.recipient_name == r.recipient_name,
991 ExportedTask.basetable == TaskIndexEntry.task_table_name,
992 # ... don't use ".tablename" as a property doesn't play
993 # nicely with SQLAlchemy here
994 ExportedTask.task_server_pk == TaskIndexEntry.task_pk,
995 ExportedTask.success == True, # noqa: E712
996 ExportedTask.cancelled == False, # noqa: E712
997 )
998 )
999 )
1000 return q
1003# noinspection PyProtectedMember
1004def encode_task_collection(coll: TaskCollection) -> Dict:
1005 """
1006 Serializes a :class:`TaskCollection`.
1008 The request is not serialized and must be rebuilt in another way; see e.g.
1009 :func:`camcops_server.cc_modules.celery.email_basic_dump`.
1010 """
1011 return {
1012 "taskfilter": dumps(coll._filter, serializer="json"),
1013 "as_dump": coll._as_dump,
1014 "sort_method_by_class": dumps(coll._sort_method_by_class,
1015 serializer="json"),
1016 }
1019# noinspection PyUnusedLocal
1020def decode_task_collection(d: Dict, cls: Type) -> TaskCollection:
1021 """
1022 Creates a :class:`TaskCollection` from a serialized version.
1024 The request is not serialized and must be rebuilt in another way; see e.g.
1025 :func:`camcops_server.cc_modules.celery.email_basic_dump`.
1026 """
1027 kwargs = {
1028 "taskfilter": loads(*reorder_args(*d["taskfilter"])),
1029 "as_dump": d["as_dump"],
1030 "sort_method_by_class": loads(
1031 *reorder_args(*d["sort_method_by_class"])),
1032 }
1033 return TaskCollection(req=None, **kwargs)
1036def reorder_args(content_type: str,
1037 content_encoding: str,
1038 data: str) -> List[str]:
1039 """
1040 kombu :func:`SerializerRegistry.dumps` returns data as last element in
1041 tuple but for :func:`SerializeRegistry.loads` it's the first argument
1042 """
1043 return [data, content_type, content_encoding]
1046register_class_for_json(
1047 cls=TaskCollection,
1048 obj_to_dict_fn=encode_task_collection,
1049 dict_to_obj_fn=decode_task_collection
1050)