Coverage for cc_modules/cc_taskfilter.py: 68%
210 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
1"""
2camcops_server/cc_modules/cc_taskfilter.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Representation of filtering criteria for tasks.**
28"""
30import datetime
31from enum import Enum
32import logging
33from typing import Any, Dict, List, Optional, Type, TYPE_CHECKING, Union
35from cardinal_pythonlib.datetimefunc import convert_datetime_to_utc
36from cardinal_pythonlib.json.serialize import register_class_for_json
37from cardinal_pythonlib.logs import BraceStyleAdapter
38from cardinal_pythonlib.reprfunc import auto_repr
39from cardinal_pythonlib.sqlalchemy.list_types import (
40 IntListType,
41 StringListType,
42)
43from pendulum import DateTime as Pendulum
44from sqlalchemy.orm import Mapped, mapped_column, Query, reconstructor
45from sqlalchemy.sql.functions import func
46from sqlalchemy.sql.expression import and_, or_
48from camcops_server.cc_modules.cc_cache import cache_region_static, fkg
49from camcops_server.cc_modules.cc_device import Device
50from camcops_server.cc_modules.cc_group import Group
51from camcops_server.cc_modules.cc_patient import Patient
52from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
53from camcops_server.cc_modules.cc_simpleobjects import IdNumReference
54from camcops_server.cc_modules.cc_sqla_coltypes import (
55 PendulumDateTimeAsIsoTextColType,
56 IdNumReferenceListColType,
57 PatientNameColType,
58 SexColType,
59)
60from camcops_server.cc_modules.cc_sqlalchemy import Base
61from camcops_server.cc_modules.cc_task import (
62 tablename_to_task_class_dict,
63 Task,
64)
65from camcops_server.cc_modules.cc_taskindex import PatientIdNumIndexEntry
66from camcops_server.cc_modules.cc_user import User
68if TYPE_CHECKING:
69 from sqlalchemy.sql.elements import ColumnElement
70 from camcops_server.cc_modules.cc_request import CamcopsRequest
72log = BraceStyleAdapter(logging.getLogger(__name__))
75# =============================================================================
76# Sorting helpers
77# =============================================================================
80class TaskClassSortMethod(Enum):
81 """
82 Enum to represent ways to sort task types (classes).
83 """
85 NONE = 0
86 TABLENAME = 1
87 SHORTNAME = 2
88 LONGNAME = 3
91def sort_task_classes_in_place(
92 classlist: List[Type[Task]],
93 sortmethod: TaskClassSortMethod,
94 req: "CamcopsRequest" = None,
95) -> None:
96 """
97 Sort a list of task classes in place.
99 Args:
100 classlist: the list of task classes
101 sortmethod: a :class:`TaskClassSortMethod` enum
102 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
103 """
104 if sortmethod == TaskClassSortMethod.TABLENAME:
105 classlist.sort(key=lambda c: c.tablename)
106 elif sortmethod == TaskClassSortMethod.SHORTNAME:
107 classlist.sort(key=lambda c: c.shortname)
108 elif sortmethod == TaskClassSortMethod.LONGNAME:
109 assert req is not None
110 classlist.sort(key=lambda c: c.longname(req))
113# =============================================================================
114# Cache task class mapping
115# =============================================================================
116# Function, staticmethod, classmethod?
117# https://stackoverflow.com/questions/8108688/in-python-when-should-i-use-a-function-instead-of-a-method # noqa
118# https://stackoverflow.com/questions/11788195/module-function-vs-staticmethod-vs-classmethod-vs-no-decorators-which-idiom-is # noqa
119# https://stackoverflow.com/questions/15017734/using-static-methods-in-python-best-practice # noqa
122def task_classes_from_table_names(
123 tablenames: List[str],
124 sortmethod: TaskClassSortMethod = TaskClassSortMethod.NONE,
125) -> List[Type[Task]]:
126 """
127 Transforms a list of task base tablenames into a list of task classes,
128 appropriately sorted.
130 Args:
131 tablenames: list of task base table names
132 sortmethod: a :class:`TaskClassSortMethod` enum
134 Returns:
135 a list of task classes, in the order requested
137 Raises:
138 :exc:`KeyError` if a table name is invalid
140 """
141 assert sortmethod != TaskClassSortMethod.LONGNAME
142 d = tablename_to_task_class_dict()
143 classes = [] # type: List[Type[Task]]
144 for tablename in tablenames:
145 cls = d[tablename]
146 classes.append(cls)
147 sort_task_classes_in_place(classes, sortmethod)
148 return classes
151@cache_region_static.cache_on_arguments(function_key_generator=fkg)
152def all_tracker_task_classes() -> List[Type[Task]]:
153 """
154 Returns a list of all task classes that provide tracker information.
155 """
156 return [
157 cls
158 for cls in Task.all_subclasses_by_shortname()
159 if cls.provides_trackers
160 ]
163# =============================================================================
164# Define a filter to apply to tasks
165# =============================================================================
168class TaskFilter(Base):
169 """
170 SQLAlchemy ORM object representing task filter criteria.
171 """
173 __tablename__ = "_task_filters"
175 # Lots of these could be changed into lists; for example, filtering to
176 # multiple devices, multiple users, multiple text patterns. For
177 # AND-joining, there is little clear benefit (one could always AND-join
178 # multiple filters with SQL). For OR-joining, this is more useful.
179 # - surname: use ID numbers instead; not very likely to have >1 surname
180 # - forename: ditto
181 # - DOB: ditto
182 # - sex: just eliminate the filter if you don't care about sex
183 # - task_types: needs a list
184 # - device_id: might as well make it a list
185 # - user_id: might as well make it a list
186 # - group_id: might as well make it a list
187 # - start_datetime: single only
188 # - end_datetime: single only
189 # - text_contents: might as well make it a list
190 # - ID numbers: a list, joined with OR.
191 id: Mapped[int] = mapped_column(
192 primary_key=True,
193 autoincrement=True,
194 index=True,
195 comment="Task filter ID (arbitrary integer)",
196 )
197 # Task type filters
198 task_types: Mapped[Optional[list[str]]] = mapped_column(
199 StringListType,
200 comment="Task filter: task type(s), as CSV list of table names",
201 )
202 tasks_offering_trackers_only: Mapped[Optional[bool]] = mapped_column(
203 comment="Task filter: restrict to tasks offering trackers only?",
204 )
205 tasks_with_patient_only: Mapped[Optional[bool]] = mapped_column(
206 comment="Task filter: restrict to tasks with a patient (non-anonymous "
207 "tasks) only?",
208 )
209 # Patient-related filters
210 surname: Mapped[Optional[str]] = mapped_column(
211 PatientNameColType, comment="Task filter: surname"
212 )
213 forename: Mapped[Optional[str]] = mapped_column(
214 PatientNameColType, comment="Task filter: forename"
215 )
216 dob: Mapped[Optional[datetime.date]] = mapped_column(
217 comment="Task filter: DOB"
218 )
219 sex: Mapped[Optional[str]] = mapped_column(
220 SexColType, comment="Task filter: sex"
221 )
222 # new in v2.0.1
223 idnum_criteria: Mapped[Optional[List[IdNumReference]]] = mapped_column(
224 IdNumReferenceListColType,
225 comment="ID filters as JSON; the ID number definitions are joined "
226 "with OR",
227 )
228 # Other filters
229 device_ids: Mapped[Optional[List[int]]] = mapped_column(
230 IntListType,
231 comment="Task filter: source device ID(s), as CSV",
232 )
233 adding_user_ids: Mapped[Optional[List[int]]] = mapped_column(
234 "user_ids",
235 IntListType,
236 comment="Task filter: adding (uploading) user ID(s), as CSV",
237 )
238 group_ids: Mapped[Optional[List[int]]] = mapped_column(
239 IntListType, comment="Task filter: group ID(s), as CSV"
240 )
241 start_datetime: Mapped[Optional[Pendulum]] = mapped_column(
242 "start_datetime_iso8601",
243 PendulumDateTimeAsIsoTextColType,
244 comment="Task filter: start date/time (UTC as ISO8601)",
245 )
246 end_datetime: Mapped[Optional[Pendulum]] = mapped_column(
247 "end_datetime_iso8601",
248 PendulumDateTimeAsIsoTextColType,
249 comment="Task filter: end date/time (UTC as ISO8601)",
250 )
251 # Implemented on the Python side for indexed lookup:
252 text_contents: Mapped[Optional[List[str]]] = mapped_column(
253 StringListType,
254 comment="Task filter: filter text fields",
255 ) # task must contain ALL the strings in AT LEAST ONE of its text columns
256 # Implemented on the Python side for non-indexed lookup:
257 complete_only: Mapped[Optional[bool]] = mapped_column(
258 comment="Task filter: task complete?"
259 )
261 def __init__(self, **kwargs: Any) -> None:
262 super().__init__(**kwargs)
263 # We need to initialize these explicitly, because if we create an
264 # instance via "x = TaskFilter()", they will be initialized to None,
265 # without any recourse to our database to-and-fro conversion code for
266 # each fieldtype.
267 # (If we load from a database, things will be fine.)
268 self.idnum_criteria = [] # type: List[IdNumReference]
269 self.device_ids = [] # type: List[int]
270 self.adding_user_ids = [] # type: List[int]
271 self.group_ids = [] # type: List[int]
272 self.text_contents = [] # type: List[str]
274 # ANYTHING YOU ADD BELOW HERE MUST ALSO BE IN init_on_load().
275 # Or call it, of course, but we like to keep on the happy side of the
276 # PyCharm type checker.
278 # Python-only filtering attributes (i.e. not saved to database)
279 self.era = None # type: Optional[str]
280 self.finalized_only = False # used for exports
281 self.must_have_idnum_type = None # type: Optional[int]
283 # Other Python-only attributes
284 self._sort_method = TaskClassSortMethod.NONE
285 self._task_classes = None # type: Optional[List[Type[Task]]]
287 @reconstructor
288 def init_on_load(self) -> None:
289 """
290 SQLAlchemy function to recreate after loading from the database.
291 """
292 self.era = None # type: ignore[no-redef] # type: Optional[str]
293 self.finalized_only = False
294 self.must_have_idnum_type = None # type: ignore[no-redef]
296 self._sort_method = TaskClassSortMethod.NONE
297 self._task_classes = None # type: ignore[no-redef]
299 def __repr__(self) -> str:
300 return auto_repr(self, with_addr=True)
302 def set_sort_method(self, sort_method: TaskClassSortMethod) -> None:
303 """
304 Sets the sorting method for task types.
305 """
306 assert sort_method != TaskClassSortMethod.LONGNAME, (
307 "If you want to use that sorting method, you need to save a "
308 "request object, because long task names use translation"
309 )
310 self._sort_method = sort_method
312 @property
313 def task_classes(self) -> List[Type[Task]]:
314 """
315 Return a list of task classes permitted by the filter.
317 Uses caching, since the filter will be called repeatedly.
318 """
319 if self._task_classes is None:
320 self._task_classes = [] # type: ignore[no-redef]
321 if self.task_types:
322 starting_classes = task_classes_from_table_names(
323 self.task_types
324 )
325 else:
326 starting_classes = Task.all_subclasses_by_shortname()
327 skip_anonymous_tasks = self.skip_anonymous_tasks()
328 for cls in starting_classes:
329 if (
330 self.tasks_offering_trackers_only
331 and not cls.provides_trackers
332 ):
333 # Class doesn't provide trackers; skip
334 continue
335 if skip_anonymous_tasks and not cls.has_patient:
336 # Anonymous task; skip
337 continue
338 if self.text_contents and not cls.get_text_filter_columns():
339 # Text filter and task has no text columns; skip
340 continue
341 self._task_classes.append(cls)
342 sort_task_classes_in_place(self._task_classes, self._sort_method)
343 return self._task_classes
345 def skip_anonymous_tasks(self) -> bool:
346 """
347 Should we skip anonymous tasks?
348 """
349 return self.tasks_with_patient_only or self.any_patient_filtering()
351 def offers_all_task_types(self) -> bool:
352 """
353 Does this filter offer every single task class? Used for efficiency
354 when using indexes. (Since ignored.)
355 """
356 if self.tasks_offering_trackers_only:
357 return False
358 if self.skip_anonymous_tasks():
359 return False
360 if not self.task_types:
361 return True
362 return set(self.task_classes) == set(Task.all_subclasses_by_shortname)
364 def offers_all_non_anonymous_task_types(self) -> bool:
365 """
366 Does this filter offer every single non-anonymous task class? Used for
367 efficiency when using indexes.
368 """
369 offered_task_classes = self.task_classes
370 for taskclass in Task.all_subclasses_by_shortname():
371 if taskclass.is_anonymous:
372 continue
373 if taskclass not in offered_task_classes:
374 return False
375 return True
377 @property
378 def task_tablename_list(self) -> List[str]:
379 """
380 Returns the base table names for all task types permitted by the
381 filter.
382 """
383 return [cls.__tablename__ for cls in self.task_classes]
385 def any_patient_filtering(self) -> bool:
386 """
387 Is some sort of patient filtering being applied?
388 """
389 return (
390 bool(self.surname)
391 or bool(self.forename)
392 or (self.dob is not None)
393 or bool(self.sex)
394 or bool(self.idnum_criteria)
395 )
397 def any_specific_patient_filtering(self) -> bool:
398 """
399 Are there filters that would restrict to one or a few patients?
401 (Differs from :func:`any_patient_filtering` with respect to sex.)
402 """
403 return (
404 bool(self.surname)
405 or bool(self.forename)
406 or self.dob is not None
407 or bool(self.idnum_criteria)
408 )
410 def get_only_iddef(self) -> Optional["IdNumReference"]:
411 """
412 If a single ID number type/value restriction is being applied, return
413 it, as an
414 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`.
415 Otherwise, return ``None``.
416 """
417 if len(self.idnum_criteria) != 1:
418 return None
419 return self.idnum_criteria[0]
421 def get_group_names(self, req: "CamcopsRequest") -> List[str]:
422 """
423 Get the names of any groups to which we are restricting.
424 """
425 groups = (
426 req.dbsession.query(Group)
427 .filter(Group.id.in_(self.group_ids))
428 .all()
429 ) # type: List[Group]
430 return [g.name if g and g.name else "" for g in groups]
432 def get_user_names(self, req: "CamcopsRequest") -> List[str]:
433 """
434 Get the usernames of any uploading users to which we are restricting.
435 """
436 users = (
437 req.dbsession.query(User)
438 .filter(User.id.in_(self.adding_user_ids))
439 .all()
440 ) # type: List[User]
441 return [u.username if u and u.username else "" for u in users]
443 def get_device_names(self, req: "CamcopsRequest") -> List[str]:
444 """
445 Get the names of any devices to which we are restricting.
446 """
447 devices = (
448 req.dbsession.query(Device)
449 .filter(Device.id.in_(self.device_ids))
450 .all()
451 ) # type: List[Device]
452 return [d.name if d and d.name else "" for d in devices]
454 def clear(self) -> None:
455 """
456 Clear all parts of the filter.
457 """
458 self.task_types = [] # type: List[str]
460 self.surname = None
461 self.forename = None
462 self.dob = None # type: Optional[datetime.date]
463 self.sex = None
464 self.idnum_criteria = [] # type: List[IdNumReference]
466 self.device_ids = [] # type: List[int]
467 self.adding_user_ids = [] # type: List[int]
468 self.group_ids = [] # type: List[int]
469 self.start_datetime = (
470 None
471 ) # type: Union[None, Pendulum, datetime.datetime]
472 self.end_datetime = (
473 None
474 ) # type: Union[None, Pendulum, datetime.datetime]
475 self.text_contents = [] # type: List[str]
477 self.complete_only = None # type: Optional[bool]
479 def dates_inconsistent(self) -> bool:
480 """
481 Are inconsistent dates specified, such that no tasks should be
482 returned?
483 """
484 return (
485 self.start_datetime # type: ignore[return-value]
486 and self.end_datetime
487 and self.end_datetime < self.start_datetime
488 )
490 def filter_query_by_patient(self, q: Query, via_index: bool) -> Query:
491 """
492 Restricts an query that has *already been joined* to the
493 :class:`camcops_server.cc_modules.cc_patient.Patient` class, according
494 to the patient filtering criteria.
496 Args:
497 q: the starting SQLAlchemy ORM Query
498 via_index:
499 If ``True``, the query relates to a
500 :class:`camcops_server.cc_modules.cc_taskindex.TaskIndexEntry`
501 and we should restrict it according to the
502 :class:`camcops_server.cc_modules.cc_taskindex.PatientIdNumIndexEntry`
503 class. If ``False``, the query relates to a
504 :class:`camcops_server.cc_modules.cc_taskindex.Task` and we
505 should restrict according to
506 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`.
508 Returns:
509 a revised Query
511 """
512 if self.surname:
513 q = q.filter(func.upper(Patient.surname) == self.surname.upper())
514 if self.forename:
515 q = q.filter(func.upper(Patient.forename) == self.forename.upper())
516 if self.dob is not None:
517 q = q.filter(Patient.dob == self.dob)
518 if self.sex:
519 q = q.filter(func.upper(Patient.sex) == self.sex.upper())
521 if self.idnum_criteria:
522 id_filter_parts = [] # type: List[ColumnElement]
523 if via_index:
524 q = q.join(PatientIdNumIndexEntry)
525 # "Specify possible ID number values"
526 for iddef in self.idnum_criteria:
527 id_filter_parts.append(
528 and_(
529 PatientIdNumIndexEntry.which_idnum
530 == iddef.which_idnum,
531 PatientIdNumIndexEntry.idnum_value
532 == iddef.idnum_value,
533 )
534 )
535 # Use OR (disjunction) of the specified values:
536 q = q.filter(or_(*id_filter_parts))
537 # "Must have a value for a given ID number type"
538 if self.must_have_idnum_type:
539 # noinspection PyComparisonWithNone,PyPep8
540 q = q.filter(
541 and_(
542 PatientIdNumIndexEntry.which_idnum
543 == self.must_have_idnum_type,
544 PatientIdNumIndexEntry.idnum_value
545 != None, # noqa: E711
546 )
547 )
548 else:
549 # q = q.join(PatientIdNum) # fails
550 q = q.join(Patient.idnums) # type: ignore[arg-type]
551 # "Specify possible ID number values"
552 for iddef in self.idnum_criteria:
553 id_filter_parts.append(
554 and_(
555 PatientIdNum.which_idnum == iddef.which_idnum,
556 PatientIdNum.idnum_value == iddef.idnum_value,
557 )
558 )
559 # Use OR (disjunction) of the specified values:
560 q = q.filter(or_(*id_filter_parts))
561 # "Must have a value for a given ID number type"
562 if self.must_have_idnum_type:
563 # noinspection PyComparisonWithNone,PyPep8
564 q = q.filter(
565 and_(
566 PatientIdNum.which_idnum
567 == self.must_have_idnum_type,
568 PatientIdNum.idnum_value != None, # noqa: E711
569 )
570 )
572 return q
574 @property
575 def start_datetime_utc(self) -> Optional[Pendulum]:
576 if not self.start_datetime:
577 return None
578 return convert_datetime_to_utc(self.start_datetime)
580 @property
581 def end_datetime_utc(self) -> Optional[Pendulum]:
582 if not self.end_datetime:
583 return None
584 return convert_datetime_to_utc(self.end_datetime)
587def encode_task_filter(taskfilter: TaskFilter) -> Dict:
588 return {
589 "task_types": taskfilter.task_types,
590 "group_ids": taskfilter.group_ids,
591 }
594# noinspection PyUnusedLocal
595def decode_task_filter(d: Dict, cls: Type) -> TaskFilter:
596 taskfilter = TaskFilter()
597 taskfilter.task_types = d["task_types"]
598 taskfilter.group_ids = d["group_ids"]
600 return taskfilter
603register_class_for_json(
604 cls=TaskFilter,
605 obj_to_dict_fn=encode_task_filter,
606 dict_to_obj_fn=decode_task_filter,
607)