Coverage for cc_modules/cc_taskcollection.py: 45%
376 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
1"""
2camcops_server/cc_modules/cc_taskcollection.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Classes to fetch tasks from the database as efficiently as possible.**
28"""
30from collections import OrderedDict
31import datetime
32from enum import Enum
33import logging
34from threading import Thread
35from typing import (
36 Any,
37 Dict,
38 Generator,
39 List,
40 Optional,
41 Tuple,
42 Type,
43 TYPE_CHECKING,
44 Union,
45)
47from cardinal_pythonlib.json.serialize import (
48 register_class_for_json,
49 register_enum_for_json,
50)
51from cardinal_pythonlib.logs import BraceStyleAdapter
52from cardinal_pythonlib.reprfunc import auto_repr, auto_str
53from cardinal_pythonlib.sort import MINTYPE_SINGLETON, MinType
54from kombu.serialization import dumps, loads
55from pendulum import DateTime as Pendulum
56from sqlalchemy.orm import Query
57from sqlalchemy.orm.session import Session as SqlASession
58from sqlalchemy.sql.functions import func
59from sqlalchemy.sql.expression import and_, exists, or_
61from camcops_server.cc_modules.cc_constants import ERA_NOW
62from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
63from camcops_server.cc_modules.cc_task import (
64 tablename_to_task_class_dict,
65 Task,
66)
67from camcops_server.cc_modules.cc_taskfactory import (
68 task_query_restricted_to_permitted_users,
69)
70from camcops_server.cc_modules.cc_taskfilter import TaskFilter
71from camcops_server.cc_modules.cc_taskindex import TaskIndexEntry
73if TYPE_CHECKING:
74 from sqlalchemy.sql.elements import ClauseElement, ColumnElement
75 from camcops_server.cc_modules.cc_request import CamcopsRequest
77log = BraceStyleAdapter(logging.getLogger(__name__))
80# =============================================================================
81# Debugging options
82# =============================================================================
84DEBUG_QUERY_TIMING = False
86if DEBUG_QUERY_TIMING:
87 log.warning("Debugging options enabled!")
90# =============================================================================
91# Sorting helpers
92# =============================================================================
95def task_when_created_sorter(
96 task: Task,
97) -> Union[Tuple[Pendulum, datetime.datetime], MinType]:
98 """
99 Function to sort tasks by their creation date/time (with upload date/time
100 as a tiebreak for consistent ordering).
101 """
102 # For sorting of tasks
103 created = task.when_created
104 # noinspection PyProtectedMember
105 uploaded = task._when_added_batch_utc
106 return MINTYPE_SINGLETON if created is None else (created, uploaded)
109@register_enum_for_json
110class TaskSortMethod(Enum):
111 """
112 Enum representing ways to sort tasks.
113 """
115 NONE = 0
116 CREATION_DATE_ASC = 1
117 CREATION_DATE_DESC = 2
120def sort_tasks_in_place(
121 tasklist: List[Task], sortmethod: TaskSortMethod
122) -> None:
123 """
124 Sort a list of tasks, in place, according to ``sortmethod``.
126 Args:
127 tasklist: the list of tasks
128 sortmethod: a :class:`TaskSortMethod` enum
129 """
130 # Sort?
131 if sortmethod == TaskSortMethod.CREATION_DATE_ASC:
132 tasklist.sort(key=task_when_created_sorter)
133 elif sortmethod == TaskSortMethod.CREATION_DATE_DESC:
134 tasklist.sort(key=task_when_created_sorter, reverse=True)
137# =============================================================================
138# Parallel fetch helper
139# =============================================================================
140# - Why consider a parallel fetch?
141# Because a typical fetch might involve 27ms per query (as seen by Python;
142# less as seen by MySQL) but about 100 queries, for a not-very-large
143# database.
144# - Initially UNSUCCESSFUL: even after tweaking pool_size=0 in create_engine()
145# to get round the SQLAlchemy error "QueuePool limit of size 5 overflow 10
146# reached", in the parallel code, a great many queries are launched, but then
147# something goes wrong and others are started but then block -- for ages --
148# waiting for a spare database connection, or something.
149# - Fixed that: I was not explicitly closing the sessions.
150# - But then a major conceptual problem: anything to be lazy-loaded (e.g.
151# patient, but also patient ID, special note, BLOB...) will give this sort of
152# error: "DetachedInstanceError: Parent instance <Phq9 at 0x7fe6cce2d278> is
153# not bound to a Session; lazy load operation of attribute 'patient' cannot
154# proceed" -- for obvious reasons. And some of those operations are only
155# required on the final paginated task set, which requires aggregation across
156# all tasks.
157#
158# HOWEVER, the query time per table drops from ~27ms to 4-8ms if we disable
159# eager loading (lazy="joined") of patients from tasks.
162class FetchThread(Thread):
163 """
164 Thread to fetch tasks in parallel.
166 CURRENTLY UNUSED.
167 """
169 def __init__(
170 self,
171 req: "CamcopsRequest",
172 task_class: Type[Task],
173 factory: "TaskCollection",
174 **kwargs: Any
175 ) -> None:
176 self.req = req
177 self.task_class = task_class
178 self.factory = factory
179 self.error = False
180 name = task_class.__tablename__
181 super().__init__(name=name, target=None, **kwargs)
183 def run(self) -> None:
184 log.debug("Thread starting")
185 dbsession = self.req.get_bare_dbsession()
186 # noinspection PyBroadException
187 try:
188 # noinspection PyProtectedMember
189 q = self.factory._make_query(dbsession, self.task_class)
190 if q:
191 tasks = q.all() # type: List[Task]
192 # https://stackoverflow.com/questions/6319207/are-lists-thread-safe # noqa
193 # https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary # noqa
194 # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm # noqa
195 # noinspection PyProtectedMember
196 self.factory._tasks_by_class[self.task_class] = tasks
197 log.debug("Thread finishing with results")
198 else:
199 log.debug("Thread finishing without results")
200 except Exception:
201 self.error = True
202 log.error("Thread error")
203 dbsession.close()
206# =============================================================================
207# Make a set of tasks, deferring work until things are needed
208# =============================================================================
211class TaskCollection(object):
212 """
213 Represent a potential or instantiated call to fetch tasks from the
214 database.
216 The caller may want them in a giant list (e.g. task viewer, CTVs), or split
217 by task class (e.g. trackers).
218 """
220 def __init__(
221 self,
222 req: Optional["CamcopsRequest"],
223 taskfilter: TaskFilter = None,
224 as_dump: bool = False,
225 sort_method_by_class: TaskSortMethod = TaskSortMethod.NONE,
226 sort_method_global: TaskSortMethod = TaskSortMethod.NONE,
227 current_only: bool = True,
228 via_index: bool = True,
229 export_recipient: "ExportRecipient" = None,
230 ) -> None:
231 """
232 Args:
233 req:
234 The
235 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`.
236 ``None`` should only be used as a parameter when serializing
237 a :class:`TaskCollection` to the back-end.
238 taskfilter:
239 A :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`
240 object that contains any restrictions we may want to apply.
241 Must be supplied unless supplying ``export_recipient`` (in
242 which case, must not be supplied).
243 as_dump:
244 Use the "dump" permissions rather than the "view" permissions?
245 sort_method_by_class:
246 How should we sort tasks within each task class?
247 sort_method_global:
248 How should we sort tasks overall (across all task types)?
249 current_only:
250 Restrict to ``_current`` tasks only?
251 via_index:
252 Use the server's index (faster)? (Not possible with
253 ``current_only=False``.)
254 export_recipient:
255 A :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
256 """ # noqa
257 if via_index and not current_only:
258 log.warning("Can't use index for non-current tasks")
259 via_index = False
261 self._req = req
262 self._filter = taskfilter
263 self._as_dump = as_dump
264 self._sort_method_by_class = sort_method_by_class
265 self._sort_method_global = sort_method_global
266 self._current_only = current_only
267 self._via_index = via_index
268 self.export_recipient = export_recipient
270 if export_recipient:
271 # We create a new filter to reflect the export recipient.
272 assert (
273 self._filter is None
274 ), "Can't supply taskfilter if you supply export_recipient"
275 # We can do lots of what we need with a TaskFilter().
276 self._filter = TaskFilter()
277 if not export_recipient.all_groups:
278 self._filter.group_ids = export_recipient.group_ids
279 self._filter.task_types = export_recipient.tasks
280 self._filter.start_datetime = export_recipient.start_datetime_utc # type: ignore[assignment] # noqa: E501
281 self._filter.end_datetime = export_recipient.end_datetime_utc # type: ignore[assignment] # noqa: E501
282 self._filter.finalized_only = export_recipient.finalized_only
283 self._filter.tasks_with_patient_only = (
284 not export_recipient.anonymous_ok()
285 )
286 self._filter.must_have_idnum_type = export_recipient.primary_idnum
287 else:
288 assert (
289 self._filter
290 ), "Must supply taskfilter unless you supply export_recipient"
292 self._tasks_by_class = (
293 OrderedDict()
294 ) # type: Dict[Type[Task], List[Task]]
295 self._all_tasks = None # type: Optional[List[Task]]
296 self._all_indexes = (
297 None
298 ) # type: Optional[Union[List[TaskIndexEntry], Query]]
300 def __repr__(self) -> str:
301 return auto_repr(self)
303 def __str__(self) -> str:
304 return auto_str(self)
306 # =========================================================================
307 # Interface to read
308 # =========================================================================
310 @property
311 def req(self) -> "CamcopsRequest":
312 """
313 Returns the associated request, or raises :exc:`AssertionError` if it's
314 not been set.
315 """
316 assert (
317 self._req is not None
318 ), "Must initialize with a request or call set_request() first"
319 return self._req
321 def set_request(self, req: "CamcopsRequest") -> None:
322 """
323 Sets the request object manually. Used by Celery back-end tasks.
325 Args:
326 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
327 """
328 self._req = req
330 def task_classes(self) -> List[Type[Task]]:
331 """
332 Return a list of task classes that we want.
333 """
334 return self._filter.task_classes
336 def tasks_for_task_class(self, task_class: Type[Task]) -> List[Task]:
337 """
338 Returns all appropriate task instances for a specific task type.
339 """
340 if self._via_index:
341 self._ensure_everything_fetched_via_index()
342 else:
343 self._fetch_task_class(task_class)
344 tasklist = self._tasks_by_class.get(task_class, [])
345 return tasklist
347 @property
348 def all_tasks(self) -> List[Task]:
349 """
350 Returns a list of all appropriate task instances.
351 """
352 if self._all_tasks is None:
353 if self._via_index:
354 self._ensure_everything_fetched_via_index()
355 else:
356 self._fetch_all_tasks_without_index()
357 return self._all_tasks
359 @property
360 def all_tasks_or_indexes_or_query(
361 self,
362 ) -> Union[List[Task], List[TaskIndexEntry], Query]:
363 """
364 Returns a list of all appropriate task instances, or index entries, or
365 a query returning them.
367 - Returning a list of tasks is fine, but the results of this function
368 may be paginated (e.g. in the main task view), so the end result may
369 be that e.g. 20,000 tasks are fetched and 20 are shown.
370 - More efficient is to fetch 20,000 indexes from the single index
371 table, and fetch only the 20 tasks we need.
372 - More efficient still is to fetch the 20 indexes we need, and then
373 their task.
374 """
375 if not self._via_index:
376 return self.all_tasks
378 self._build_index_query() # ensure self._all_indexes is set
380 if self._all_tasks is not None:
381 # The tasks themselves have been fetched.
382 return self._all_tasks
384 return self._all_indexes # indexes or a query to fetch them
386 # def forget_task_class(self, task_class: Type[Task]) -> None:
387 # """
388 # Ditch results for a specific task class (for memory efficiency).
389 # """
390 # self._tasks_by_class.pop(task_class, None)
391 # # The "None" option prevents it from raising KeyError if the key
392 # # doesn't exist.
393 # # https://stackoverflow.com/questions/11277432/how-to-remove-a-key-from-a-python-dictionary # noqa
395 def gen_all_tasks_or_indexes(
396 self,
397 ) -> Generator[Union[Task, TaskIndexEntry], None, None]:
398 """
399 Generates tasks or index entries.
400 """
401 tasks_or_indexes_or_query = self.all_tasks_or_indexes_or_query
402 if isinstance(tasks_or_indexes_or_query, Query):
403 for item in tasks_or_indexes_or_query.all():
404 yield item
405 else:
406 for item in tasks_or_indexes_or_query:
407 yield item
409 def gen_tasks_by_class(self) -> Generator[Task, None, None]:
410 """
411 Generates all tasks, class-wise.
412 """
413 for cls in self.task_classes():
414 for task in self.tasks_for_task_class(cls):
415 yield task
417 def gen_tasks_in_global_order(self) -> Generator[Task, None, None]:
418 """
419 Generates all tasks, in the global order.
420 """
421 for task in self.all_tasks:
422 yield task
424 @property
425 def dbsession(self) -> SqlASession:
426 """
427 Returns the request's database session.
428 """
429 return self.req.dbsession
431 # =========================================================================
432 # Internals: fetching Task objects
433 # =========================================================================
435 def _fetch_all_tasks_without_index(self, parallel: bool = False) -> None:
436 """
437 Fetch all tasks from the database.
438 """
440 # AVOID parallel=True; see notes above.
441 if DEBUG_QUERY_TIMING:
442 start_time = Pendulum.now()
444 if parallel:
445 # Deprecated parallel fetch
446 threads = [] # type: List[FetchThread]
447 for task_class in self._filter.task_classes:
448 thread = FetchThread(self.req, task_class, self)
449 thread.start()
450 threads.append(thread)
451 for thread in threads:
452 thread.join()
453 if thread.error:
454 raise ValueError("Multithreaded fetch failed")
456 else:
457 # Fetch all tasks, classwise.
458 for task_class in self._filter.task_classes:
459 self._fetch_task_class(task_class)
461 if DEBUG_QUERY_TIMING:
462 end_time = Pendulum.now()
463 # noinspection PyUnboundLocalVariable
464 time_taken = end_time - start_time
465 log.info("_fetch_all_tasks took {}", time_taken)
467 # Build our joint task list
468 self._all_tasks = [] # type: ignore[no-redef] # type: List[Task]
469 for single_task_list in self._tasks_by_class.values():
470 self._all_tasks += single_task_list
471 sort_tasks_in_place(self._all_tasks, self._sort_method_global)
473 def _fetch_task_class(self, task_class: Type[Task]) -> None:
474 """
475 Fetch tasks from the database for one task type.
476 """
477 if task_class in self._tasks_by_class:
478 return # already fetched
479 q = self._serial_query(task_class)
480 if q is None:
481 newtasks = [] # type: List[Task]
482 else:
483 newtasks = q.all() # type: ignore[no-redef] # type: List[Task]
484 # Apply Python-side filters?
485 newtasks = self._filter_through_python(newtasks)
486 sort_tasks_in_place(newtasks, self._sort_method_by_class)
487 self._tasks_by_class[task_class] = newtasks
489 def _serial_query(self, task_class: Type[Task]) -> Optional[Query]:
490 """
491 Make and return an SQLAlchemy ORM query for a specific task class.
493 Returns ``None`` if no tasks would match our criteria.
494 """
495 dbsession = self.req.dbsession
496 return self._make_query(dbsession, task_class)
498 def _make_query(
499 self, dbsession: SqlASession, task_class: Type[Task]
500 ) -> Optional[Query]:
501 """
502 Make and return an SQLAlchemy ORM query for a specific task class.
504 Returns ``None`` if no tasks would match our criteria.
505 """
506 q = dbsession.query(task_class)
508 # Restrict to what the web front end will supply
509 # noinspection PyProtectedMember
510 if self._current_only:
511 # noinspection PyProtectedMember
512 q = q.filter(task_class._current == True) # noqa: E712
514 # Restrict to what is PERMITTED
515 q = task_query_restricted_to_permitted_users(
516 self.req, q, task_class, as_dump=self._as_dump
517 )
519 # Restrict to what is DESIRED
520 if q:
521 q = self._task_query_restricted_by_filter(q, task_class)
522 if q and self.export_recipient:
523 q = self._task_query_restricted_by_export_recipient(q, task_class)
525 return q
527 def _task_query_restricted_by_filter(
528 self, q: Query, cls: Type[Task]
529 ) -> Optional[Query]:
530 """
531 Restricts an SQLAlchemy ORM query for a given task class to those
532 tasks that our filter permits.
534 THIS IS A KEY SECURITY FUNCTION, since it implements some permissions
535 that relate to viewing tasks when unfiltered.
537 Args:
538 q: the starting SQLAlchemy ORM Query
539 cls: the task class
541 Returns:
542 the original query, a modified query, or ``None`` if no tasks
543 would pass the filter
545 """
546 tf = self._filter # task filter
547 user = self.req.user
549 if tf.group_ids:
550 permitted_group_ids = tf.group_ids.copy()
551 else:
552 permitted_group_ids = None # unrestricted
554 if tf.dates_inconsistent():
555 return None
557 if cls not in tf.task_classes:
558 # We don't want this task
559 return None
561 if not cls.is_anonymous:
562 # Not anonymous.
563 if not tf.any_specific_patient_filtering():
564 # No patient filtering. Permissions depend on user settings.
565 if user.may_view_all_patients_when_unfiltered:
566 # May see everything. No restrictions.
567 pass
568 elif user.may_view_no_patients_when_unfiltered:
569 # Can't see patient data from any group.
570 # (a) User not permitted to view any patients when
571 # unfiltered, and (b) not filtered to a level that would
572 # reasonably restrict to one or a small number of
573 # patients. Skip the task class.
574 return None
575 else:
576 # May see patient data from some, but not all, groups.
577 liberal_group_ids = (
578 user.group_ids_nonsuperuser_may_see_when_unfiltered()
579 )
580 if not permitted_group_ids: # was unrestricted
581 permitted_group_ids = liberal_group_ids
582 else: # was restricted; restrict further
583 permitted_group_ids = [
584 gid
585 for gid in permitted_group_ids
586 if gid in liberal_group_ids
587 ]
588 if not permitted_group_ids:
589 return None # down to zero; no point continuing
591 # Patient filtering
592 if tf.any_patient_filtering():
593 # q = q.join(Patient) # fails
594 q = q.join(
595 cls.patient # type: ignore[arg-type]
596 ) # use explicitly configured relationship
597 q = tf.filter_query_by_patient(q, via_index=False)
599 # Patient-independent filtering
601 if tf.device_ids:
602 # noinspection PyProtectedMember
603 q = q.filter(cls._device_id.in_(tf.device_ids))
605 if tf.era:
606 # noinspection PyProtectedMember
607 q = q.filter(cls._era == tf.era)
608 if tf.finalized_only:
609 q = q.filter(cls._era != ERA_NOW)
611 if tf.adding_user_ids:
612 # noinspection PyProtectedMember
613 q = q.filter(cls._adding_user_id.in_(tf.adding_user_ids))
615 if permitted_group_ids:
616 # noinspection PyProtectedMember
617 q = q.filter(cls._group_id.in_(permitted_group_ids))
619 if tf.start_datetime is not None:
620 q = q.filter(cls.when_created >= tf.start_datetime)
621 if tf.end_datetime is not None:
622 q = q.filter(cls.when_created < tf.end_datetime)
624 q = self._filter_query_for_text_contents(q, cls)
626 return q
628 def _task_query_restricted_by_export_recipient(
629 self, q: Query, cls: Type[Task]
630 ) -> Optional[Query]:
631 """
632 For exports.
634 Filters via our
635 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`,
636 except for the bits already implemented via our
637 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`.
639 The main job here is for incremental exports: to find tasks that have
640 not yet been exported. We look for any tasks not yet exported to a
641 recipient of the same name (regardless of ``ExportRecipient.id``, which
642 changes when the export recipient is reconfigured).
644 Compare :meth:`_index_query_restricted_by_export_recipient`.
646 Args:
647 q: the starting SQLAlchemy ORM Query
648 cls: the task class
650 Returns:
651 the original query, a modified query, or ``None`` if no tasks
652 would pass the filter
653 """
654 from camcops_server.cc_modules.cc_exportmodels import (
655 ExportedTask,
656 ) # delayed import
658 r = self.export_recipient
659 if not r.is_incremental():
660 # Full database export; no restrictions
661 return q
662 # Otherwise, restrict to tasks not yet sent to this recipient.
663 # noinspection PyUnresolvedReferences
664 q = q.filter(
665 # "There is not a successful export record for this task/recipient"
666 ~exists()
667 .select_from(
668 ExportedTask.__table__.join(
669 ExportRecipient.__table__,
670 ExportedTask.recipient_id == ExportRecipient.id,
671 )
672 )
673 .where(
674 and_(
675 ExportRecipient.recipient_name == r.recipient_name,
676 ExportedTask.basetable == cls.__tablename__,
677 ExportedTask.task_server_pk == cls._pk,
678 ExportedTask.success == True, # noqa: E712
679 ExportedTask.cancelled == False, # noqa: E712
680 )
681 )
682 )
683 return q
685 def _filter_through_python(self, tasks: List[Task]) -> List[Task]:
686 """
687 Returns those tasks in the list provided that pass any Python-only
688 aspects of our filter (those parts not easily calculable via SQL).
690 This applies to the "direct" (and not "via index") routes only. With
691 the index, we can do everything via SQL.
692 """
693 assert not self._via_index
694 if not self._has_python_parts_to_filter():
695 return tasks
696 return [
697 t for t in tasks if self._task_matches_python_parts_of_filter(t)
698 ]
700 def _has_python_parts_to_filter(self) -> bool:
701 """
702 Does the filter have aspects to it that require some Python thought,
703 not just a database query?
705 Only applicable to the direct (not "via index") route.
706 """
707 assert not self._via_index
708 return self._filter.complete_only
710 def _task_matches_python_parts_of_filter(self, task: Task) -> bool:
711 """
712 Does the task pass the Python parts of the filter?
714 Only applicable to the direct (not "via index") route.
715 """
716 assert not self._via_index
718 # "Is task complete" filter
719 if self._filter.complete_only:
720 if not task.is_complete():
721 return False
723 return True
725 # =========================================================================
726 # Shared between Task and TaskIndexEntry methods
727 # =========================================================================
729 def _filter_query_for_text_contents(
730 self, q: Query, taskclass: Type[Task]
731 ) -> Optional[Query]:
732 """
733 Returns the query, filtered for the "text contents" filter.
735 Args:
736 q: the starting SQLAlchemy ORM Query
737 taskclass: the task class
739 Returns:
740 a Query, potentially modified.
741 """
742 tf = self._filter # task filter
744 if not tf.text_contents:
745 return q # unmodified
747 # task must contain ALL the strings in AT LEAST ONE text column
748 textcols = taskclass.get_text_filter_columns()
749 if not textcols:
750 # Text filtering requested, but there are no text columns, so
751 # by definition the filter must fail.
752 return None
753 clauses_over_text_phrases = [] # type: List[ColumnElement]
754 # ... each e.g. "col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%'" # noqa
755 for textfilter in tf.text_contents:
756 tf_lower = textfilter.lower()
757 clauses_over_columns = [] # type: List[ColumnElement]
758 # ... each e.g. "col1 LIKE '%paracetamol%'"
759 for textcol in textcols:
760 # Case-insensitive comparison:
761 # https://groups.google.com/forum/#!topic/sqlalchemy/331XoToT4lk
762 # https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/StringComparisonFilter # noqa
763 clauses_over_columns.append(
764 func.lower(textcol).contains(tf_lower, autoescape=True)
765 )
766 clauses_over_text_phrases.append(or_(*clauses_over_columns))
767 return q.filter(and_(*clauses_over_text_phrases))
768 # ... thus, e.g.
769 # "(col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%') AND
770 # (col1 LIKE '%overdose%' OR col2 LIKE '%overdose%')
772 # =========================================================================
773 # Internals: fetching TaskIndexEntry objects
774 # =========================================================================
776 def _ensure_everything_fetched_via_index(self) -> None:
777 """
778 Ensure we have all our tasks loaded, using the index.
779 """
780 self._build_index_query()
781 self._fetch_tasks_from_indexes()
783 def _build_index_query(self) -> None:
784 """
785 Creates a Query in :attr:`_all_indexes` that will fetch task indexes.
786 If the task filtering requires the tasks to be fetched (i.e. text
787 contents), fetch the actual tasks too (and filter them).
788 """
789 if self._all_indexes is not None:
790 return
791 self._all_indexes = self._make_index_query()
792 if self._filter.text_contents:
793 self._fetch_tasks_from_indexes()
795 def _fetch_tasks_from_indexes(self) -> None:
796 """
797 Takes the query that has already been stored in :attr:`_all_indexes`,
798 and populate the task attributes, :attr:`_all_tasks` and
799 :attr:`_tasks_by_class`.
800 """
801 if self._all_tasks is not None:
802 return
803 assert self._all_indexes is not None
805 d = tablename_to_task_class_dict()
806 dbsession = self.req.dbsession
807 self._all_tasks = [] # type: ignore[no-redef] # type: List[Task]
809 # Fetch indexes
810 if isinstance(self._all_indexes, Query):
811 # Query built, but indexes not yet fetched.
812 # Replace the query with actual indexes
813 self._all_indexes = ( # type: ignore[no-redef]
814 self._all_indexes.all()
815 ) # type: List[TaskIndexEntry]
816 indexes = self._all_indexes
818 # Fetch tasks
819 tablenames = set(index.task_table_name for index in indexes)
820 for tablename in tablenames:
821 # We do this by task class, so we can execute a single query per
822 # task type (rather than per task).
823 try:
824 taskclass = d[tablename]
825 except KeyError:
826 log.warning("Bad tablename in index: {!r}", tablename)
827 continue
828 tasklist = self._tasks_by_class.setdefault(taskclass, [])
829 task_pks = [i.task_pk for i in indexes if i.tablename == tablename]
830 # noinspection PyProtectedMember
831 qtask = dbsession.query(taskclass).filter(
832 taskclass._pk.in_(task_pks)
833 )
834 qtask = self._filter_query_for_text_contents(qtask, taskclass)
835 tasks = qtask.all() # type: List[Task]
836 for task in tasks:
837 tasklist.append(task)
838 self._all_tasks.append(task)
840 # Sort tasks
841 for tasklist in self._tasks_by_class.values():
842 sort_tasks_in_place(tasklist, self._sort_method_by_class)
843 sort_tasks_in_place(self._all_tasks, self._sort_method_global)
845 def _make_index_query(self) -> Optional[Query]:
846 """
847 Make and return an SQLAlchemy ORM query to retrieve indexes.
849 Returns ``None`` if no tasks would match our criteria.
850 """
851 dbsession = self.req.dbsession
852 q = dbsession.query(TaskIndexEntry)
854 # Restrict to what the web front end will supply
855 assert self._current_only, "_current_only must be true to use index"
857 # Restrict to what is PERMITTED
858 if not self.export_recipient:
859 q = task_query_restricted_to_permitted_users(
860 self.req, q, TaskIndexEntry, as_dump=self._as_dump
861 )
863 # Restrict to what is DESIRED
864 if q:
865 q = self._index_query_restricted_by_filter(q)
866 if q and self.export_recipient:
867 q = self._index_query_restricted_by_export_recipient(q)
869 return q
871 def _index_query_restricted_by_filter(self, q: Query) -> Optional[Query]:
872 """
873 Counterpart to :func:`_task_query_restricted_by_filter`, but for
874 indexes.
876 THIS IS A KEY SECURITY FUNCTION, since it implements some permissions
877 that relate to viewing tasks when unfiltered.
879 Args:
880 q: the starting SQLAlchemy ORM Query
882 Returns:
883 the original query, a modified query, or ``None`` if no tasks
884 would pass the filter
886 """
887 tf = self._filter # task filter
888 user = self.req.user
890 if tf.group_ids:
891 permitted_group_ids = tf.group_ids.copy()
892 else:
893 permitted_group_ids = None # unrestricted
895 if tf.dates_inconsistent():
896 return None
898 # Task type filtering
900 if tf.skip_anonymous_tasks():
901 # noinspection PyPep8
902 q = q.filter(TaskIndexEntry.patient_pk != None) # noqa: E711
904 if not tf.offers_all_non_anonymous_task_types():
905 permitted_task_tablenames = [
906 tc.__tablename__ for tc in tf.task_classes
907 ]
908 q = q.filter(
909 TaskIndexEntry.task_table_name.in_(permitted_task_tablenames)
910 )
912 # Special rules when we've not filtered for any patients
914 if not tf.any_specific_patient_filtering():
915 # No patient filtering. Permissions depend on user settings.
916 if user.may_view_all_patients_when_unfiltered:
917 # May see everything. No restrictions.
918 pass
919 elif user.may_view_no_patients_when_unfiltered:
920 # Can't see patient data from any group.
921 # (a) User not permitted to view any patients when
922 # unfiltered, and (b) not filtered to a level that would
923 # reasonably restrict to one or a small number of
924 # patients. Restrict to anonymous tasks.
925 # noinspection PyPep8
926 q = q.filter(TaskIndexEntry.patient_pk == None) # noqa: E711
927 else:
928 # May see patient data from some, but not all, groups.
929 # This is a little more complex than the equivalent in
930 # _task_query_restricted_by_filter(), because we shouldn't
931 # restrict anonymous tasks.
932 liberal_group_ids = (
933 user.group_ids_nonsuperuser_may_see_when_unfiltered()
934 )
935 # Anonymous is OK:
936 # noinspection PyPep8
937 liberal_or_anon_criteria = [
938 TaskIndexEntry.patient_pk == None # noqa: E711
939 ] # type: List[ClauseElement]
940 for gid in liberal_group_ids:
941 liberal_or_anon_criteria.append(
942 TaskIndexEntry.group_id == gid # this group OK
943 )
944 q = q.filter(or_(*liberal_or_anon_criteria)) # type: ignore[arg-type] # noqa: E501
946 # Patient filtering
948 if tf.any_patient_filtering():
949 q = q.join(TaskIndexEntry.patient) # use relationship
950 q = tf.filter_query_by_patient(q, via_index=True)
952 # Patient-independent filtering
954 if tf.device_ids:
955 # noinspection PyProtectedMember
956 q = q.filter(TaskIndexEntry.device_id.in_(tf.device_ids))
958 if tf.era:
959 # noinspection PyProtectedMember
960 q = q.filter(TaskIndexEntry.era == tf.era)
961 if tf.finalized_only:
962 q = q.filter(TaskIndexEntry.era != ERA_NOW)
964 if tf.adding_user_ids:
965 # noinspection PyProtectedMember
966 q = q.filter(TaskIndexEntry.adding_user_id.in_(tf.adding_user_ids))
968 if permitted_group_ids:
969 # noinspection PyProtectedMember
970 q = q.filter(TaskIndexEntry.group_id.in_(permitted_group_ids))
972 if tf.start_datetime is not None:
973 q = q.filter(
974 TaskIndexEntry.when_created_utc >= tf.start_datetime_utc
975 )
976 if tf.end_datetime is not None:
977 q = q.filter(TaskIndexEntry.when_created_utc < tf.end_datetime_utc)
979 # text_contents is managed at the later fetch stage when using indexes
981 # But is_complete can be filtered now and in SQL:
982 if tf.complete_only:
983 # noinspection PyPep8
984 q = q.filter(TaskIndexEntry.task_is_complete == True) # noqa: E712
986 # When we use indexes, we embed the global sort criteria in the query.
987 if self._sort_method_global == TaskSortMethod.CREATION_DATE_ASC:
988 q = q.order_by(
989 TaskIndexEntry.when_created_utc.asc(),
990 TaskIndexEntry.when_added_batch_utc.asc(),
991 )
992 elif self._sort_method_global == TaskSortMethod.CREATION_DATE_DESC:
993 q = q.order_by(
994 TaskIndexEntry.when_created_utc.desc(),
995 TaskIndexEntry.when_added_batch_utc.desc(),
996 )
998 return q
1000 def _index_query_restricted_by_export_recipient(
1001 self, q: Query
1002 ) -> Optional[Query]:
1003 """
1004 For exports.
1006 Filters via our
1007 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`,
1008 except for the bits already implemented via our
1009 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`.
1011 The main job here is for incremental exports: to find tasks that have
1012 not yet been exported.
1014 Compare :meth:`_task_query_restricted_by_export_recipient`.
1016 Args:
1017 q: the starting SQLAlchemy ORM Query
1019 Returns:
1020 the original query, a modified query, or ``None`` if no tasks
1021 would pass the filter
1023 """
1024 from camcops_server.cc_modules.cc_exportmodels import (
1025 ExportedTask,
1026 ) # delayed import
1028 r = self.export_recipient
1029 if not r.is_incremental():
1030 # Full database export; no restrictions
1031 return q
1032 # Otherwise, restrict to tasks not yet sent to this recipient.
1033 # Remember: q is a query on TaskIndexEntry.
1034 # noinspection PyUnresolvedReferences
1035 q = q.filter(
1036 # "There is not a successful export record for this task/recipient"
1037 ~exists()
1038 .select_from(
1039 ExportedTask.__table__.join(
1040 ExportRecipient.__table__,
1041 ExportedTask.recipient_id == ExportRecipient.id,
1042 )
1043 )
1044 .where(
1045 and_(
1046 ExportRecipient.recipient_name == r.recipient_name,
1047 ExportedTask.basetable == TaskIndexEntry.task_table_name,
1048 # ... don't use ".tablename" as a property doesn't play
1049 # nicely with SQLAlchemy here
1050 ExportedTask.task_server_pk == TaskIndexEntry.task_pk,
1051 ExportedTask.success == True, # noqa: E712
1052 ExportedTask.cancelled == False, # noqa: E712
1053 )
1054 )
1055 )
1056 return q
1059# noinspection PyProtectedMember
1060def encode_task_collection(coll: TaskCollection) -> Dict:
1061 """
1062 Serializes a :class:`TaskCollection`.
1064 The request is not serialized and must be rebuilt in another way; see e.g.
1065 :func:`camcops_server.cc_modules.celery.email_basic_dump`.
1066 """
1067 return {
1068 "taskfilter": dumps(coll._filter, serializer="json"),
1069 "as_dump": coll._as_dump,
1070 "sort_method_by_class": dumps(
1071 coll._sort_method_by_class, serializer="json"
1072 ),
1073 }
1076# noinspection PyUnusedLocal
1077def decode_task_collection(d: Dict, cls: Type) -> TaskCollection:
1078 """
1079 Creates a :class:`TaskCollection` from a serialized version.
1081 The request is not serialized and must be rebuilt in another way; see e.g.
1082 :func:`camcops_server.cc_modules.celery.email_basic_dump`.
1083 """
1084 kwargs = {
1085 "taskfilter": loads(*reorder_args(*d["taskfilter"])),
1086 "as_dump": d["as_dump"],
1087 "sort_method_by_class": loads(
1088 *reorder_args(*d["sort_method_by_class"])
1089 ),
1090 }
1091 return TaskCollection(req=None, **kwargs)
1094def reorder_args(
1095 content_type: str, content_encoding: str, data: str
1096) -> List[str]:
1097 """
1098 kombu :func:`SerializerRegistry.dumps` returns data as last element in
1099 tuple but for :func:`SerializeRegistry.loads` it's the first argument
1100 """
1101 return [data, content_type, content_encoding]
1104register_class_for_json(
1105 cls=TaskCollection,
1106 obj_to_dict_fn=encode_task_collection,
1107 dict_to_obj_fn=decode_task_collection,
1108)