Coverage for cc_modules/cc_taskfilter.py : 39%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/cc_modules/cc_taskfilter.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27**Representation of filtering criteria for tasks.**
29"""
31import datetime
32from enum import Enum
33import logging
34from typing import Dict, List, Optional, Type, TYPE_CHECKING, Union
36from cardinal_pythonlib.datetimefunc import convert_datetime_to_utc
37from cardinal_pythonlib.json.serialize import register_class_for_json
38from cardinal_pythonlib.logs import BraceStyleAdapter
39from cardinal_pythonlib.reprfunc import auto_repr
40from cardinal_pythonlib.sqlalchemy.list_types import (
41 IntListType,
42 StringListType,
43)
44from pendulum import DateTime as Pendulum
45from sqlalchemy.orm import Query, reconstructor
46from sqlalchemy.sql.functions import func
47from sqlalchemy.sql.expression import and_, or_
48from sqlalchemy.sql.schema import Column
49from sqlalchemy.sql.sqltypes import Boolean, Date, Integer
51from camcops_server.cc_modules.cc_cache import cache_region_static, fkg
52from camcops_server.cc_modules.cc_device import Device
53from camcops_server.cc_modules.cc_group import Group
54from camcops_server.cc_modules.cc_patient import Patient
55from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
56from camcops_server.cc_modules.cc_sqla_coltypes import (
57 PendulumDateTimeAsIsoTextColType,
58 IdNumReferenceListColType,
59 PatientNameColType,
60 SexColType,
61)
62from camcops_server.cc_modules.cc_sqlalchemy import Base
63from camcops_server.cc_modules.cc_task import (
64 tablename_to_task_class_dict,
65 Task,
66)
67from camcops_server.cc_modules.cc_taskindex import PatientIdNumIndexEntry
68from camcops_server.cc_modules.cc_user import User
70if TYPE_CHECKING:
71 from sqlalchemy.sql.elements import ColumnElement
72 from camcops_server.cc_modules.cc_request import CamcopsRequest
73 from camcops_server.cc_modules.cc_simpleobjects import IdNumReference
75log = BraceStyleAdapter(logging.getLogger(__name__))
78# =============================================================================
79# Sorting helpers
80# =============================================================================
82class TaskClassSortMethod(Enum):
83 """
84 Enum to represent ways to sort task types (classes).
85 """
86 NONE = 0
87 TABLENAME = 1
88 SHORTNAME = 2
89 LONGNAME = 3
92def sort_task_classes_in_place(classlist: List[Type[Task]],
93 sortmethod: TaskClassSortMethod,
94 req: "CamcopsRequest" = None) -> None:
95 """
96 Sort a list of task classes in place.
98 Args:
99 classlist: the list of task classes
100 sortmethod: a :class:`TaskClassSortMethod` enum
101 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
102 """
103 if sortmethod == TaskClassSortMethod.TABLENAME:
104 classlist.sort(key=lambda c: c.tablename)
105 elif sortmethod == TaskClassSortMethod.SHORTNAME:
106 classlist.sort(key=lambda c: c.shortname)
107 elif sortmethod == TaskClassSortMethod.LONGNAME:
108 assert req is not None
109 classlist.sort(key=lambda c: c.longname(req))
112# =============================================================================
113# Cache task class mapping
114# =============================================================================
115# Function, staticmethod, classmethod?
116# https://stackoverflow.com/questions/8108688/in-python-when-should-i-use-a-function-instead-of-a-method # noqa
117# https://stackoverflow.com/questions/11788195/module-function-vs-staticmethod-vs-classmethod-vs-no-decorators-which-idiom-is # noqa
118# https://stackoverflow.com/questions/15017734/using-static-methods-in-python-best-practice # noqa
120def task_classes_from_table_names(
121 tablenames: List[str],
122 sortmethod: TaskClassSortMethod = TaskClassSortMethod.NONE) \
123 -> List[Type[Task]]:
124 """
125 Transforms a list of task base tablenames into a list of task classes,
126 appropriately sorted.
128 Args:
129 tablenames: list of task base table names
130 sortmethod: a :class:`TaskClassSortMethod` enum
132 Returns:
133 a list of task classes, in the order requested
135 Raises:
136 :exc:`KeyError` if a table name is invalid
138 """
139 assert sortmethod != TaskClassSortMethod.LONGNAME
140 d = tablename_to_task_class_dict()
141 classes = [] # type: List[Type[Task]]
142 for tablename in tablenames:
143 cls = d[tablename]
144 classes.append(cls)
145 sort_task_classes_in_place(classes, sortmethod)
146 return classes
149@cache_region_static.cache_on_arguments(function_key_generator=fkg)
150def all_tracker_task_classes() -> List[Type[Task]]:
151 """
152 Returns a list of all task classes that provide tracker information.
153 """
154 return [cls for cls in Task.all_subclasses_by_shortname()
155 if cls.provides_trackers]
158# =============================================================================
159# Define a filter to apply to tasks
160# =============================================================================
162class TaskFilter(Base):
163 """
164 SQLAlchemy ORM object representing task filter criteria.
165 """
166 __tablename__ = "_task_filters"
168 # Lots of these could be changed into lists; for example, filtering to
169 # multiple devices, multiple users, multiple text patterns. For
170 # AND-joining, there is little clear benefit (one could always AND-join
171 # multiple filters with SQL). For OR-joining, this is more useful.
172 # - surname: use ID numbers instead; not very likely to have >1 surname
173 # - forename: ditto
174 # - DOB: ditto
175 # - sex: just eliminate the filter if you don't care about sex
176 # - task_types: needs a list
177 # - device_id: might as well make it a list
178 # - user_id: might as well make it a list
179 # - group_id: might as well make it a list
180 # - start_datetime: single only
181 # - end_datetime: single only
182 # - text_contents: might as well make it a list
183 # - ID numbers: a list, joined with OR.
184 id = Column(
185 "id", Integer,
186 primary_key=True, autoincrement=True, index=True,
187 comment="Task filter ID (arbitrary integer)"
188 )
189 # Task type filters
190 task_types = Column(
191 "task_types", StringListType,
192 comment="Task filter: task type(s), as CSV list of table names"
193 )
194 tasks_offering_trackers_only = Column(
195 "tasks_offering_trackers_only", Boolean,
196 comment="Task filter: restrict to tasks offering trackers only?"
197 )
198 tasks_with_patient_only = Column(
199 "tasks_with_patient_only", Boolean,
200 comment="Task filter: restrict to tasks with a patient (non-anonymous "
201 "tasks) only?"
202 )
203 # Patient-related filters
204 surname = Column(
205 "surname", PatientNameColType,
206 comment="Task filter: surname"
207 )
208 forename = Column(
209 "forename", PatientNameColType,
210 comment="Task filter: forename"
211 )
212 dob = Column(
213 "dob", Date,
214 comment="Task filter: DOB"
215 ) # type: Optional[datetime.date]
216 sex = Column(
217 "sex", SexColType,
218 comment="Task filter: sex"
219 )
220 idnum_criteria = Column( # new in v2.0.1
221 "idnum_criteria", IdNumReferenceListColType,
222 comment="ID filters as JSON; the ID number definitions are joined "
223 "with OR"
224 )
225 # Other filters
226 device_ids = Column(
227 "device_ids", IntListType,
228 comment="Task filter: source device ID(s), as CSV"
229 )
230 adding_user_ids = Column(
231 "user_ids", IntListType,
232 comment="Task filter: adding (uploading) user ID(s), as CSV"
233 )
234 group_ids = Column(
235 "group_ids", IntListType,
236 comment="Task filter: group ID(s), as CSV"
237 )
238 start_datetime = Column(
239 "start_datetime_iso8601", PendulumDateTimeAsIsoTextColType,
240 comment="Task filter: start date/time (UTC as ISO8601)"
241 ) # type: Union[None, Pendulum, datetime.datetime]
242 end_datetime = Column(
243 "end_datetime_iso8601", PendulumDateTimeAsIsoTextColType,
244 comment="Task filter: end date/time (UTC as ISO8601)"
245 ) # type: Union[None, Pendulum, datetime.datetime]
246 # Implemented on the Python side for indexed lookup:
247 text_contents = Column(
248 "text_contents", StringListType,
249 comment="Task filter: filter text fields"
250 ) # task must contain ALL the strings in AT LEAST ONE of its text columns
251 # Implemented on the Python side for non-indexed lookup:
252 complete_only = Column(
253 "complete_only", Boolean,
254 comment="Task filter: task complete?"
255 )
257 def __init__(self) -> None:
258 # We need to initialize these explicitly, because if we create an
259 # instance via "x = TaskFilter()", they will be initialized to None,
260 # without any recourse to our database to-and-fro conversion code for
261 # each fieldtype.
262 # (If we load from a database, things will be fine.)
263 self.idnum_criteria = [] # type: List[IdNumReference]
264 self.device_ids = [] # type: List[int]
265 self.adding_user_ids = [] # type: List[int]
266 self.group_ids = [] # type: List[int]
267 self.text_contents = [] # type: List[str]
269 # ANYTHING YOU ADD BELOW HERE MUST ALSO BE IN init_on_load().
270 # Or call it, of course, but we like to keep on the happy side of the
271 # PyCharm type checker.
273 # Python-only filtering attributes (i.e. not saved to database)
274 self.era = None # type: Optional[str]
275 self.finalized_only = False # used for exports
276 self.must_have_idnum_type = None # type: Optional[int]
278 # Other Python-only attributes
279 self._sort_method = TaskClassSortMethod.NONE
280 self._task_classes = None # type: Optional[List[Type[Task]]]
282 @reconstructor
283 def init_on_load(self):
284 """
285 SQLAlchemy function to recreate after loading from the database.
286 """
287 self.era = None # type: Optional[str]
288 self.finalized_only = False
289 self.must_have_idnum_type = None # type: Optional[int]
291 self._sort_method = TaskClassSortMethod.NONE
292 self._task_classes = None # type: Optional[List[Type[Task]]]
294 def __repr__(self) -> str:
295 return auto_repr(self, with_addr=True)
297 def set_sort_method(self, sort_method: TaskClassSortMethod) -> None:
298 """
299 Sets the sorting method for task types.
300 """
301 assert sort_method != TaskClassSortMethod.LONGNAME, (
302 "If you want to use that sorting method, you need to save a "
303 "request object, because long task names use translation"
304 )
305 self._sort_method = sort_method
307 @property
308 def task_classes(self) -> List[Type[Task]]:
309 """
310 Return a list of task classes permitted by the filter.
312 Uses caching, since the filter will be called repeatedly.
313 """
314 if self._task_classes is None:
315 self._task_classes = [] # type: List[Type[Task]]
316 if self.task_types:
317 starting_classes = task_classes_from_table_names(
318 self.task_types)
319 else:
320 starting_classes = Task.all_subclasses_by_shortname()
321 skip_anonymous_tasks = self.skip_anonymous_tasks()
322 for cls in starting_classes:
323 if (self.tasks_offering_trackers_only and
324 not cls.provides_trackers):
325 # Class doesn't provide trackers; skip
326 continue
327 if skip_anonymous_tasks and not cls.has_patient:
328 # Anonymous task; skip
329 continue
330 if self.text_contents and not cls.get_text_filter_columns():
331 # Text filter and task has no text columns; skip
332 continue
333 self._task_classes.append(cls)
334 sort_task_classes_in_place(self._task_classes, self._sort_method)
335 return self._task_classes
337 def skip_anonymous_tasks(self) -> bool:
338 """
339 Should we skip anonymous tasks?
340 """
341 return self.tasks_with_patient_only or self.any_patient_filtering()
343 def offers_all_task_types(self) -> bool:
344 """
345 Does this filter offer every single task class? Used for efficiency
346 when using indexes. (Since ignored.)
347 """
348 if self.tasks_offering_trackers_only:
349 return False
350 if self.skip_anonymous_tasks():
351 return False
352 if not self.task_types:
353 return True
354 return set(self.task_classes) == set(Task.all_subclasses_by_shortname)
356 def offers_all_non_anonymous_task_types(self) -> bool:
357 """
358 Does this filter offer every single non-anonymous task class? Used for
359 efficiency when using indexes.
360 """
361 offered_task_classes = self.task_classes
362 for taskclass in Task.all_subclasses_by_shortname():
363 if taskclass.is_anonymous:
364 continue
365 if taskclass not in offered_task_classes:
366 return False
367 return True
369 @property
370 def task_tablename_list(self) -> List[str]:
371 """
372 Returns the base table names for all task types permitted by the
373 filter.
374 """
375 return [cls.__tablename__ for cls in self.task_classes]
377 def any_patient_filtering(self) -> bool:
378 """
379 Is some sort of patient filtering being applied?
380 """
381 return (
382 bool(self.surname) or
383 bool(self.forename) or
384 (self.dob is not None) or
385 bool(self.sex) or
386 bool(self.idnum_criteria)
387 )
389 def any_specific_patient_filtering(self) -> bool:
390 """
391 Are there filters that would restrict to one or a few patients?
393 (Differs from :func:`any_patient_filtering` with respect to sex.)
394 """
395 return (
396 bool(self.surname) or
397 bool(self.forename) or
398 self.dob is not None or
399 bool(self.idnum_criteria)
400 )
402 def get_only_iddef(self) -> Optional["IdNumReference"]:
403 """
404 If a single ID number type/value restriction is being applied, return
405 it, as an
406 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`.
407 Otherwise, return ``None``.
408 """
409 if len(self.idnum_criteria) != 1:
410 return None
411 return self.idnum_criteria[0]
413 def get_group_names(self, req: "CamcopsRequest") -> List[str]:
414 """
415 Get the names of any groups to which we are restricting.
416 """
417 groups = (
418 req.dbsession.query(Group)
419 .filter(Group.id.in_(self.group_ids))
420 .all()
421 ) # type: List[Group]
422 return [g.name if g and g.name else "" for g in groups]
424 def get_user_names(self, req: "CamcopsRequest") -> List[str]:
425 """
426 Get the usernames of any uploading users to which we are restricting.
427 """
428 users = (
429 req.dbsession.query(User)
430 .filter(User.id.in_(self.adding_user_ids))
431 .all()
432 ) # type: List[User]
433 return [u.username if u and u.username else "" for u in users]
435 def get_device_names(self, req: "CamcopsRequest") -> List[str]:
436 """
437 Get the names of any devices to which we are restricting.
438 """
439 devices = (
440 req.dbsession.query(Device)
441 .filter(Device.id.in_(self.device_ids))
442 .all()
443 ) # type: List[Device]
444 return [d.name if d and d.name else "" for d in devices]
446 def clear(self) -> None:
447 """
448 Clear all parts of the filter.
449 """
450 self.task_types = [] # type: List[str]
452 self.surname = None
453 self.forename = None
454 self.dob = None # type: Optional[datetime.date]
455 self.sex = None
456 self.idnum_criteria = [] # type: List[IdNumReference]
458 self.device_ids = [] # type: List[int]
459 self.adding_user_ids = [] # type: List[int]
460 self.group_ids = [] # type: List[int]
461 self.start_datetime = None # type: Union[None, Pendulum, datetime.datetime] # noqa
462 self.end_datetime = None # type: Union[None, Pendulum, datetime.datetime] # noqa
463 self.text_contents = [] # type: List[str]
465 self.complete_only = None # type: Optional[bool]
467 def dates_inconsistent(self) -> bool:
468 """
469 Are inconsistent dates specified, such that no tasks should be
470 returned?
471 """
472 return (self.start_datetime and self.end_datetime and
473 self.end_datetime < self.start_datetime)
475 def filter_query_by_patient(self, q: Query,
476 via_index: bool) -> Query:
477 """
478 Restricts an query that has *already been joined* to the
479 :class:`camcops_server.cc_modules.cc_patient.Patient` class, according
480 to the patient filtering criteria.
482 Args:
483 q: the starting SQLAlchemy ORM Query
484 via_index:
485 If ``True``, the query relates to a
486 :class:`camcops_server.cc_modules.cc_taskindex.TaskIndexEntry`
487 and we should restrict it according to the
488 :class:`camcops_server.cc_modules.cc_taskindex.PatientIdNumIndexEntry`
489 class. If ``False``, the query relates to a
490 :class:`camcops_server.cc_modules.cc_taskindex.Task` and we
491 should restrict according to
492 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`.
494 Returns:
495 a revised Query
497 """
498 if self.surname:
499 q = q.filter(func.upper(Patient.surname) ==
500 self.surname.upper())
501 if self.forename:
502 q = q.filter(func.upper(Patient.forename) ==
503 self.forename.upper())
504 if self.dob is not None:
505 q = q.filter(Patient.dob == self.dob)
506 if self.sex:
507 q = q.filter(func.upper(Patient.sex) == self.sex.upper())
509 if self.idnum_criteria:
510 id_filter_parts = [] # type: List[ColumnElement]
511 if via_index:
512 q = q.join(PatientIdNumIndexEntry)
513 # "Specify possible ID number values"
514 for iddef in self.idnum_criteria:
515 id_filter_parts.append(and_(
516 PatientIdNumIndexEntry.which_idnum == iddef.which_idnum, # noqa
517 PatientIdNumIndexEntry.idnum_value == iddef.idnum_value
518 ))
519 # Use OR (disjunction) of the specified values:
520 q = q.filter(or_(*id_filter_parts))
521 # "Must have a value for a given ID number type"
522 if self.must_have_idnum_type:
523 # noinspection PyComparisonWithNone,PyPep8
524 q = q.filter(and_(
525 PatientIdNumIndexEntry.which_idnum == self.must_have_idnum_type, # noqa
526 PatientIdNumIndexEntry.idnum_value != None # noqa: E711
527 ))
528 else:
529 # q = q.join(PatientIdNum) # fails
530 q = q.join(Patient.idnums)
531 # "Specify possible ID number values"
532 for iddef in self.idnum_criteria:
533 id_filter_parts.append(and_(
534 PatientIdNum.which_idnum == iddef.which_idnum,
535 PatientIdNum.idnum_value == iddef.idnum_value
536 ))
537 # Use OR (disjunction) of the specified values:
538 q = q.filter(or_(*id_filter_parts))
539 # "Must have a value for a given ID number type"
540 if self.must_have_idnum_type:
541 # noinspection PyComparisonWithNone,PyPep8
542 q = q.filter(and_(
543 PatientIdNum.which_idnum == self.must_have_idnum_type,
544 PatientIdNum.idnum_value != None # noqa: E711
545 ))
547 return q
549 @property
550 def start_datetime_utc(self) -> Optional[Pendulum]:
551 if not self.start_datetime:
552 return None
553 return convert_datetime_to_utc(self.start_datetime)
555 @property
556 def end_datetime_utc(self) -> Optional[Pendulum]:
557 if not self.end_datetime:
558 return None
559 return convert_datetime_to_utc(self.end_datetime)
562def encode_task_filter(taskfilter: TaskFilter) -> Dict:
563 return {
564 "task_types": taskfilter.task_types,
565 "group_ids": taskfilter.group_ids,
566 }
569# noinspection PyUnusedLocal
570def decode_task_filter(d: Dict, cls: Type) -> TaskFilter:
571 taskfilter = TaskFilter()
572 taskfilter.task_types = d["task_types"]
573 taskfilter.group_ids = d["group_ids"]
575 return taskfilter
578register_class_for_json(
579 cls=TaskFilter,
580 obj_to_dict_fn=encode_task_filter,
581 dict_to_obj_fn=decode_task_filter
582)