Coverage for cc_modules/cc_taskfilter.py: 68%

210 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:51 +0100

1""" 

2camcops_server/cc_modules/cc_taskfilter.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Representation of filtering criteria for tasks.** 

27 

28""" 

29 

30import datetime 

31from enum import Enum 

32import logging 

33from typing import Any, Dict, List, Optional, Type, TYPE_CHECKING, Union 

34 

35from cardinal_pythonlib.datetimefunc import convert_datetime_to_utc 

36from cardinal_pythonlib.json.serialize import register_class_for_json 

37from cardinal_pythonlib.logs import BraceStyleAdapter 

38from cardinal_pythonlib.reprfunc import auto_repr 

39from cardinal_pythonlib.sqlalchemy.list_types import ( 

40 IntListType, 

41 StringListType, 

42) 

43from pendulum import DateTime as Pendulum 

44from sqlalchemy.orm import Mapped, mapped_column, Query, reconstructor 

45from sqlalchemy.sql.functions import func 

46from sqlalchemy.sql.expression import and_, or_ 

47 

48from camcops_server.cc_modules.cc_cache import cache_region_static, fkg 

49from camcops_server.cc_modules.cc_device import Device 

50from camcops_server.cc_modules.cc_group import Group 

51from camcops_server.cc_modules.cc_patient import Patient 

52from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

53from camcops_server.cc_modules.cc_simpleobjects import IdNumReference 

54from camcops_server.cc_modules.cc_sqla_coltypes import ( 

55 PendulumDateTimeAsIsoTextColType, 

56 IdNumReferenceListColType, 

57 PatientNameColType, 

58 SexColType, 

59) 

60from camcops_server.cc_modules.cc_sqlalchemy import Base 

61from camcops_server.cc_modules.cc_task import ( 

62 tablename_to_task_class_dict, 

63 Task, 

64) 

65from camcops_server.cc_modules.cc_taskindex import PatientIdNumIndexEntry 

66from camcops_server.cc_modules.cc_user import User 

67 

68if TYPE_CHECKING: 

69 from sqlalchemy.sql.elements import ColumnElement 

70 from camcops_server.cc_modules.cc_request import CamcopsRequest 

71 

72log = BraceStyleAdapter(logging.getLogger(__name__)) 

73 

74 

75# ============================================================================= 

76# Sorting helpers 

77# ============================================================================= 

78 

79 

80class TaskClassSortMethod(Enum): 

81 """ 

82 Enum to represent ways to sort task types (classes). 

83 """ 

84 

85 NONE = 0 

86 TABLENAME = 1 

87 SHORTNAME = 2 

88 LONGNAME = 3 

89 

90 

91def sort_task_classes_in_place( 

92 classlist: List[Type[Task]], 

93 sortmethod: TaskClassSortMethod, 

94 req: "CamcopsRequest" = None, 

95) -> None: 

96 """ 

97 Sort a list of task classes in place. 

98 

99 Args: 

100 classlist: the list of task classes 

101 sortmethod: a :class:`TaskClassSortMethod` enum 

102 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

103 """ 

104 if sortmethod == TaskClassSortMethod.TABLENAME: 

105 classlist.sort(key=lambda c: c.tablename) 

106 elif sortmethod == TaskClassSortMethod.SHORTNAME: 

107 classlist.sort(key=lambda c: c.shortname) 

108 elif sortmethod == TaskClassSortMethod.LONGNAME: 

109 assert req is not None 

110 classlist.sort(key=lambda c: c.longname(req)) 

111 

112 

113# ============================================================================= 

114# Cache task class mapping 

115# ============================================================================= 

116# Function, staticmethod, classmethod? 

117# https://stackoverflow.com/questions/8108688/in-python-when-should-i-use-a-function-instead-of-a-method # noqa 

118# https://stackoverflow.com/questions/11788195/module-function-vs-staticmethod-vs-classmethod-vs-no-decorators-which-idiom-is # noqa 

119# https://stackoverflow.com/questions/15017734/using-static-methods-in-python-best-practice # noqa 

120 

121 

122def task_classes_from_table_names( 

123 tablenames: List[str], 

124 sortmethod: TaskClassSortMethod = TaskClassSortMethod.NONE, 

125) -> List[Type[Task]]: 

126 """ 

127 Transforms a list of task base tablenames into a list of task classes, 

128 appropriately sorted. 

129 

130 Args: 

131 tablenames: list of task base table names 

132 sortmethod: a :class:`TaskClassSortMethod` enum 

133 

134 Returns: 

135 a list of task classes, in the order requested 

136 

137 Raises: 

138 :exc:`KeyError` if a table name is invalid 

139 

140 """ 

141 assert sortmethod != TaskClassSortMethod.LONGNAME 

142 d = tablename_to_task_class_dict() 

143 classes = [] # type: List[Type[Task]] 

144 for tablename in tablenames: 

145 cls = d[tablename] 

146 classes.append(cls) 

147 sort_task_classes_in_place(classes, sortmethod) 

148 return classes 

149 

150 

151@cache_region_static.cache_on_arguments(function_key_generator=fkg) 

152def all_tracker_task_classes() -> List[Type[Task]]: 

153 """ 

154 Returns a list of all task classes that provide tracker information. 

155 """ 

156 return [ 

157 cls 

158 for cls in Task.all_subclasses_by_shortname() 

159 if cls.provides_trackers 

160 ] 

161 

162 

163# ============================================================================= 

164# Define a filter to apply to tasks 

165# ============================================================================= 

166 

167 

168class TaskFilter(Base): 

169 """ 

170 SQLAlchemy ORM object representing task filter criteria. 

171 """ 

172 

173 __tablename__ = "_task_filters" 

174 

175 # Lots of these could be changed into lists; for example, filtering to 

176 # multiple devices, multiple users, multiple text patterns. For 

177 # AND-joining, there is little clear benefit (one could always AND-join 

178 # multiple filters with SQL). For OR-joining, this is more useful. 

179 # - surname: use ID numbers instead; not very likely to have >1 surname 

180 # - forename: ditto 

181 # - DOB: ditto 

182 # - sex: just eliminate the filter if you don't care about sex 

183 # - task_types: needs a list 

184 # - device_id: might as well make it a list 

185 # - user_id: might as well make it a list 

186 # - group_id: might as well make it a list 

187 # - start_datetime: single only 

188 # - end_datetime: single only 

189 # - text_contents: might as well make it a list 

190 # - ID numbers: a list, joined with OR. 

191 id: Mapped[int] = mapped_column( 

192 primary_key=True, 

193 autoincrement=True, 

194 index=True, 

195 comment="Task filter ID (arbitrary integer)", 

196 ) 

197 # Task type filters 

198 task_types: Mapped[Optional[list[str]]] = mapped_column( 

199 StringListType, 

200 comment="Task filter: task type(s), as CSV list of table names", 

201 ) 

202 tasks_offering_trackers_only: Mapped[Optional[bool]] = mapped_column( 

203 comment="Task filter: restrict to tasks offering trackers only?", 

204 ) 

205 tasks_with_patient_only: Mapped[Optional[bool]] = mapped_column( 

206 comment="Task filter: restrict to tasks with a patient (non-anonymous " 

207 "tasks) only?", 

208 ) 

209 # Patient-related filters 

210 surname: Mapped[Optional[str]] = mapped_column( 

211 PatientNameColType, comment="Task filter: surname" 

212 ) 

213 forename: Mapped[Optional[str]] = mapped_column( 

214 PatientNameColType, comment="Task filter: forename" 

215 ) 

216 dob: Mapped[Optional[datetime.date]] = mapped_column( 

217 comment="Task filter: DOB" 

218 ) 

219 sex: Mapped[Optional[str]] = mapped_column( 

220 SexColType, comment="Task filter: sex" 

221 ) 

222 # new in v2.0.1 

223 idnum_criteria: Mapped[Optional[List[IdNumReference]]] = mapped_column( 

224 IdNumReferenceListColType, 

225 comment="ID filters as JSON; the ID number definitions are joined " 

226 "with OR", 

227 ) 

228 # Other filters 

229 device_ids: Mapped[Optional[List[int]]] = mapped_column( 

230 IntListType, 

231 comment="Task filter: source device ID(s), as CSV", 

232 ) 

233 adding_user_ids: Mapped[Optional[List[int]]] = mapped_column( 

234 "user_ids", 

235 IntListType, 

236 comment="Task filter: adding (uploading) user ID(s), as CSV", 

237 ) 

238 group_ids: Mapped[Optional[List[int]]] = mapped_column( 

239 IntListType, comment="Task filter: group ID(s), as CSV" 

240 ) 

241 start_datetime: Mapped[Optional[Pendulum]] = mapped_column( 

242 "start_datetime_iso8601", 

243 PendulumDateTimeAsIsoTextColType, 

244 comment="Task filter: start date/time (UTC as ISO8601)", 

245 ) 

246 end_datetime: Mapped[Optional[Pendulum]] = mapped_column( 

247 "end_datetime_iso8601", 

248 PendulumDateTimeAsIsoTextColType, 

249 comment="Task filter: end date/time (UTC as ISO8601)", 

250 ) 

251 # Implemented on the Python side for indexed lookup: 

252 text_contents: Mapped[Optional[List[str]]] = mapped_column( 

253 StringListType, 

254 comment="Task filter: filter text fields", 

255 ) # task must contain ALL the strings in AT LEAST ONE of its text columns 

256 # Implemented on the Python side for non-indexed lookup: 

257 complete_only: Mapped[Optional[bool]] = mapped_column( 

258 comment="Task filter: task complete?" 

259 ) 

260 

261 def __init__(self, **kwargs: Any) -> None: 

262 super().__init__(**kwargs) 

263 # We need to initialize these explicitly, because if we create an 

264 # instance via "x = TaskFilter()", they will be initialized to None, 

265 # without any recourse to our database to-and-fro conversion code for 

266 # each fieldtype. 

267 # (If we load from a database, things will be fine.) 

268 self.idnum_criteria = [] # type: List[IdNumReference] 

269 self.device_ids = [] # type: List[int] 

270 self.adding_user_ids = [] # type: List[int] 

271 self.group_ids = [] # type: List[int] 

272 self.text_contents = [] # type: List[str] 

273 

274 # ANYTHING YOU ADD BELOW HERE MUST ALSO BE IN init_on_load(). 

275 # Or call it, of course, but we like to keep on the happy side of the 

276 # PyCharm type checker. 

277 

278 # Python-only filtering attributes (i.e. not saved to database) 

279 self.era = None # type: Optional[str] 

280 self.finalized_only = False # used for exports 

281 self.must_have_idnum_type = None # type: Optional[int] 

282 

283 # Other Python-only attributes 

284 self._sort_method = TaskClassSortMethod.NONE 

285 self._task_classes = None # type: Optional[List[Type[Task]]] 

286 

287 @reconstructor 

288 def init_on_load(self) -> None: 

289 """ 

290 SQLAlchemy function to recreate after loading from the database. 

291 """ 

292 self.era = None # type: ignore[no-redef] # type: Optional[str] 

293 self.finalized_only = False 

294 self.must_have_idnum_type = None # type: ignore[no-redef] 

295 

296 self._sort_method = TaskClassSortMethod.NONE 

297 self._task_classes = None # type: ignore[no-redef] 

298 

299 def __repr__(self) -> str: 

300 return auto_repr(self, with_addr=True) 

301 

302 def set_sort_method(self, sort_method: TaskClassSortMethod) -> None: 

303 """ 

304 Sets the sorting method for task types. 

305 """ 

306 assert sort_method != TaskClassSortMethod.LONGNAME, ( 

307 "If you want to use that sorting method, you need to save a " 

308 "request object, because long task names use translation" 

309 ) 

310 self._sort_method = sort_method 

311 

312 @property 

313 def task_classes(self) -> List[Type[Task]]: 

314 """ 

315 Return a list of task classes permitted by the filter. 

316 

317 Uses caching, since the filter will be called repeatedly. 

318 """ 

319 if self._task_classes is None: 

320 self._task_classes = [] # type: ignore[no-redef] 

321 if self.task_types: 

322 starting_classes = task_classes_from_table_names( 

323 self.task_types 

324 ) 

325 else: 

326 starting_classes = Task.all_subclasses_by_shortname() 

327 skip_anonymous_tasks = self.skip_anonymous_tasks() 

328 for cls in starting_classes: 

329 if ( 

330 self.tasks_offering_trackers_only 

331 and not cls.provides_trackers 

332 ): 

333 # Class doesn't provide trackers; skip 

334 continue 

335 if skip_anonymous_tasks and not cls.has_patient: 

336 # Anonymous task; skip 

337 continue 

338 if self.text_contents and not cls.get_text_filter_columns(): 

339 # Text filter and task has no text columns; skip 

340 continue 

341 self._task_classes.append(cls) 

342 sort_task_classes_in_place(self._task_classes, self._sort_method) 

343 return self._task_classes 

344 

345 def skip_anonymous_tasks(self) -> bool: 

346 """ 

347 Should we skip anonymous tasks? 

348 """ 

349 return self.tasks_with_patient_only or self.any_patient_filtering() 

350 

351 def offers_all_task_types(self) -> bool: 

352 """ 

353 Does this filter offer every single task class? Used for efficiency 

354 when using indexes. (Since ignored.) 

355 """ 

356 if self.tasks_offering_trackers_only: 

357 return False 

358 if self.skip_anonymous_tasks(): 

359 return False 

360 if not self.task_types: 

361 return True 

362 return set(self.task_classes) == set(Task.all_subclasses_by_shortname) 

363 

364 def offers_all_non_anonymous_task_types(self) -> bool: 

365 """ 

366 Does this filter offer every single non-anonymous task class? Used for 

367 efficiency when using indexes. 

368 """ 

369 offered_task_classes = self.task_classes 

370 for taskclass in Task.all_subclasses_by_shortname(): 

371 if taskclass.is_anonymous: 

372 continue 

373 if taskclass not in offered_task_classes: 

374 return False 

375 return True 

376 

377 @property 

378 def task_tablename_list(self) -> List[str]: 

379 """ 

380 Returns the base table names for all task types permitted by the 

381 filter. 

382 """ 

383 return [cls.__tablename__ for cls in self.task_classes] 

384 

385 def any_patient_filtering(self) -> bool: 

386 """ 

387 Is some sort of patient filtering being applied? 

388 """ 

389 return ( 

390 bool(self.surname) 

391 or bool(self.forename) 

392 or (self.dob is not None) 

393 or bool(self.sex) 

394 or bool(self.idnum_criteria) 

395 ) 

396 

397 def any_specific_patient_filtering(self) -> bool: 

398 """ 

399 Are there filters that would restrict to one or a few patients? 

400 

401 (Differs from :func:`any_patient_filtering` with respect to sex.) 

402 """ 

403 return ( 

404 bool(self.surname) 

405 or bool(self.forename) 

406 or self.dob is not None 

407 or bool(self.idnum_criteria) 

408 ) 

409 

410 def get_only_iddef(self) -> Optional["IdNumReference"]: 

411 """ 

412 If a single ID number type/value restriction is being applied, return 

413 it, as an 

414 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`. 

415 Otherwise, return ``None``. 

416 """ 

417 if len(self.idnum_criteria) != 1: 

418 return None 

419 return self.idnum_criteria[0] 

420 

421 def get_group_names(self, req: "CamcopsRequest") -> List[str]: 

422 """ 

423 Get the names of any groups to which we are restricting. 

424 """ 

425 groups = ( 

426 req.dbsession.query(Group) 

427 .filter(Group.id.in_(self.group_ids)) 

428 .all() 

429 ) # type: List[Group] 

430 return [g.name if g and g.name else "" for g in groups] 

431 

432 def get_user_names(self, req: "CamcopsRequest") -> List[str]: 

433 """ 

434 Get the usernames of any uploading users to which we are restricting. 

435 """ 

436 users = ( 

437 req.dbsession.query(User) 

438 .filter(User.id.in_(self.adding_user_ids)) 

439 .all() 

440 ) # type: List[User] 

441 return [u.username if u and u.username else "" for u in users] 

442 

443 def get_device_names(self, req: "CamcopsRequest") -> List[str]: 

444 """ 

445 Get the names of any devices to which we are restricting. 

446 """ 

447 devices = ( 

448 req.dbsession.query(Device) 

449 .filter(Device.id.in_(self.device_ids)) 

450 .all() 

451 ) # type: List[Device] 

452 return [d.name if d and d.name else "" for d in devices] 

453 

454 def clear(self) -> None: 

455 """ 

456 Clear all parts of the filter. 

457 """ 

458 self.task_types = [] # type: List[str] 

459 

460 self.surname = None 

461 self.forename = None 

462 self.dob = None # type: Optional[datetime.date] 

463 self.sex = None 

464 self.idnum_criteria = [] # type: List[IdNumReference] 

465 

466 self.device_ids = [] # type: List[int] 

467 self.adding_user_ids = [] # type: List[int] 

468 self.group_ids = [] # type: List[int] 

469 self.start_datetime = ( 

470 None 

471 ) # type: Union[None, Pendulum, datetime.datetime] 

472 self.end_datetime = ( 

473 None 

474 ) # type: Union[None, Pendulum, datetime.datetime] 

475 self.text_contents = [] # type: List[str] 

476 

477 self.complete_only = None # type: Optional[bool] 

478 

479 def dates_inconsistent(self) -> bool: 

480 """ 

481 Are inconsistent dates specified, such that no tasks should be 

482 returned? 

483 """ 

484 return ( 

485 self.start_datetime # type: ignore[return-value] 

486 and self.end_datetime 

487 and self.end_datetime < self.start_datetime 

488 ) 

489 

490 def filter_query_by_patient(self, q: Query, via_index: bool) -> Query: 

491 """ 

492 Restricts an query that has *already been joined* to the 

493 :class:`camcops_server.cc_modules.cc_patient.Patient` class, according 

494 to the patient filtering criteria. 

495 

496 Args: 

497 q: the starting SQLAlchemy ORM Query 

498 via_index: 

499 If ``True``, the query relates to a 

500 :class:`camcops_server.cc_modules.cc_taskindex.TaskIndexEntry` 

501 and we should restrict it according to the 

502 :class:`camcops_server.cc_modules.cc_taskindex.PatientIdNumIndexEntry` 

503 class. If ``False``, the query relates to a 

504 :class:`camcops_server.cc_modules.cc_taskindex.Task` and we 

505 should restrict according to 

506 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`. 

507 

508 Returns: 

509 a revised Query 

510 

511 """ 

512 if self.surname: 

513 q = q.filter(func.upper(Patient.surname) == self.surname.upper()) 

514 if self.forename: 

515 q = q.filter(func.upper(Patient.forename) == self.forename.upper()) 

516 if self.dob is not None: 

517 q = q.filter(Patient.dob == self.dob) 

518 if self.sex: 

519 q = q.filter(func.upper(Patient.sex) == self.sex.upper()) 

520 

521 if self.idnum_criteria: 

522 id_filter_parts = [] # type: List[ColumnElement] 

523 if via_index: 

524 q = q.join(PatientIdNumIndexEntry) 

525 # "Specify possible ID number values" 

526 for iddef in self.idnum_criteria: 

527 id_filter_parts.append( 

528 and_( 

529 PatientIdNumIndexEntry.which_idnum 

530 == iddef.which_idnum, 

531 PatientIdNumIndexEntry.idnum_value 

532 == iddef.idnum_value, 

533 ) 

534 ) 

535 # Use OR (disjunction) of the specified values: 

536 q = q.filter(or_(*id_filter_parts)) 

537 # "Must have a value for a given ID number type" 

538 if self.must_have_idnum_type: 

539 # noinspection PyComparisonWithNone,PyPep8 

540 q = q.filter( 

541 and_( 

542 PatientIdNumIndexEntry.which_idnum 

543 == self.must_have_idnum_type, 

544 PatientIdNumIndexEntry.idnum_value 

545 != None, # noqa: E711 

546 ) 

547 ) 

548 else: 

549 # q = q.join(PatientIdNum) # fails 

550 q = q.join(Patient.idnums) # type: ignore[arg-type] 

551 # "Specify possible ID number values" 

552 for iddef in self.idnum_criteria: 

553 id_filter_parts.append( 

554 and_( 

555 PatientIdNum.which_idnum == iddef.which_idnum, 

556 PatientIdNum.idnum_value == iddef.idnum_value, 

557 ) 

558 ) 

559 # Use OR (disjunction) of the specified values: 

560 q = q.filter(or_(*id_filter_parts)) 

561 # "Must have a value for a given ID number type" 

562 if self.must_have_idnum_type: 

563 # noinspection PyComparisonWithNone,PyPep8 

564 q = q.filter( 

565 and_( 

566 PatientIdNum.which_idnum 

567 == self.must_have_idnum_type, 

568 PatientIdNum.idnum_value != None, # noqa: E711 

569 ) 

570 ) 

571 

572 return q 

573 

574 @property 

575 def start_datetime_utc(self) -> Optional[Pendulum]: 

576 if not self.start_datetime: 

577 return None 

578 return convert_datetime_to_utc(self.start_datetime) 

579 

580 @property 

581 def end_datetime_utc(self) -> Optional[Pendulum]: 

582 if not self.end_datetime: 

583 return None 

584 return convert_datetime_to_utc(self.end_datetime) 

585 

586 

587def encode_task_filter(taskfilter: TaskFilter) -> Dict: 

588 return { 

589 "task_types": taskfilter.task_types, 

590 "group_ids": taskfilter.group_ids, 

591 } 

592 

593 

594# noinspection PyUnusedLocal 

595def decode_task_filter(d: Dict, cls: Type) -> TaskFilter: 

596 taskfilter = TaskFilter() 

597 taskfilter.task_types = d["task_types"] 

598 taskfilter.group_ids = d["group_ids"] 

599 

600 return taskfilter 

601 

602 

603register_class_for_json( 

604 cls=TaskFilter, 

605 obj_to_dict_fn=encode_task_filter, 

606 dict_to_obj_fn=decode_task_filter, 

607)