Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/cc_modules/cc_taskfilter.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27**Representation of filtering criteria for tasks.** 

28 

29""" 

30 

31import datetime 

32from enum import Enum 

33import logging 

34from typing import Dict, List, Optional, Type, TYPE_CHECKING, Union 

35 

36from cardinal_pythonlib.datetimefunc import convert_datetime_to_utc 

37from cardinal_pythonlib.json.serialize import register_class_for_json 

38from cardinal_pythonlib.logs import BraceStyleAdapter 

39from cardinal_pythonlib.reprfunc import auto_repr 

40from cardinal_pythonlib.sqlalchemy.list_types import ( 

41 IntListType, 

42 StringListType, 

43) 

44from pendulum import DateTime as Pendulum 

45from sqlalchemy.orm import Query, reconstructor 

46from sqlalchemy.sql.functions import func 

47from sqlalchemy.sql.expression import and_, or_ 

48from sqlalchemy.sql.schema import Column 

49from sqlalchemy.sql.sqltypes import Boolean, Date, Integer 

50 

51from camcops_server.cc_modules.cc_cache import cache_region_static, fkg 

52from camcops_server.cc_modules.cc_device import Device 

53from camcops_server.cc_modules.cc_group import Group 

54from camcops_server.cc_modules.cc_patient import Patient 

55from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

56from camcops_server.cc_modules.cc_sqla_coltypes import ( 

57 PendulumDateTimeAsIsoTextColType, 

58 IdNumReferenceListColType, 

59 PatientNameColType, 

60 SexColType, 

61) 

62from camcops_server.cc_modules.cc_sqlalchemy import Base 

63from camcops_server.cc_modules.cc_task import ( 

64 tablename_to_task_class_dict, 

65 Task, 

66) 

67from camcops_server.cc_modules.cc_taskindex import PatientIdNumIndexEntry 

68from camcops_server.cc_modules.cc_user import User 

69 

70if TYPE_CHECKING: 

71 from sqlalchemy.sql.elements import ColumnElement 

72 from camcops_server.cc_modules.cc_request import CamcopsRequest 

73 from camcops_server.cc_modules.cc_simpleobjects import IdNumReference 

74 

75log = BraceStyleAdapter(logging.getLogger(__name__)) 

76 

77 

78# ============================================================================= 

79# Sorting helpers 

80# ============================================================================= 

81 

82class TaskClassSortMethod(Enum): 

83 """ 

84 Enum to represent ways to sort task types (classes). 

85 """ 

86 NONE = 0 

87 TABLENAME = 1 

88 SHORTNAME = 2 

89 LONGNAME = 3 

90 

91 

92def sort_task_classes_in_place(classlist: List[Type[Task]], 

93 sortmethod: TaskClassSortMethod, 

94 req: "CamcopsRequest" = None) -> None: 

95 """ 

96 Sort a list of task classes in place. 

97 

98 Args: 

99 classlist: the list of task classes 

100 sortmethod: a :class:`TaskClassSortMethod` enum 

101 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

102 """ 

103 if sortmethod == TaskClassSortMethod.TABLENAME: 

104 classlist.sort(key=lambda c: c.tablename) 

105 elif sortmethod == TaskClassSortMethod.SHORTNAME: 

106 classlist.sort(key=lambda c: c.shortname) 

107 elif sortmethod == TaskClassSortMethod.LONGNAME: 

108 assert req is not None 

109 classlist.sort(key=lambda c: c.longname(req)) 

110 

111 

112# ============================================================================= 

113# Cache task class mapping 

114# ============================================================================= 

115# Function, staticmethod, classmethod? 

116# https://stackoverflow.com/questions/8108688/in-python-when-should-i-use-a-function-instead-of-a-method # noqa 

117# https://stackoverflow.com/questions/11788195/module-function-vs-staticmethod-vs-classmethod-vs-no-decorators-which-idiom-is # noqa 

118# https://stackoverflow.com/questions/15017734/using-static-methods-in-python-best-practice # noqa 

119 

120def task_classes_from_table_names( 

121 tablenames: List[str], 

122 sortmethod: TaskClassSortMethod = TaskClassSortMethod.NONE) \ 

123 -> List[Type[Task]]: 

124 """ 

125 Transforms a list of task base tablenames into a list of task classes, 

126 appropriately sorted. 

127 

128 Args: 

129 tablenames: list of task base table names 

130 sortmethod: a :class:`TaskClassSortMethod` enum 

131 

132 Returns: 

133 a list of task classes, in the order requested 

134 

135 Raises: 

136 :exc:`KeyError` if a table name is invalid 

137 

138 """ 

139 assert sortmethod != TaskClassSortMethod.LONGNAME 

140 d = tablename_to_task_class_dict() 

141 classes = [] # type: List[Type[Task]] 

142 for tablename in tablenames: 

143 cls = d[tablename] 

144 classes.append(cls) 

145 sort_task_classes_in_place(classes, sortmethod) 

146 return classes 

147 

148 

149@cache_region_static.cache_on_arguments(function_key_generator=fkg) 

150def all_tracker_task_classes() -> List[Type[Task]]: 

151 """ 

152 Returns a list of all task classes that provide tracker information. 

153 """ 

154 return [cls for cls in Task.all_subclasses_by_shortname() 

155 if cls.provides_trackers] 

156 

157 

158# ============================================================================= 

159# Define a filter to apply to tasks 

160# ============================================================================= 

161 

162class TaskFilter(Base): 

163 """ 

164 SQLAlchemy ORM object representing task filter criteria. 

165 """ 

166 __tablename__ = "_task_filters" 

167 

168 # Lots of these could be changed into lists; for example, filtering to 

169 # multiple devices, multiple users, multiple text patterns. For 

170 # AND-joining, there is little clear benefit (one could always AND-join 

171 # multiple filters with SQL). For OR-joining, this is more useful. 

172 # - surname: use ID numbers instead; not very likely to have >1 surname 

173 # - forename: ditto 

174 # - DOB: ditto 

175 # - sex: just eliminate the filter if you don't care about sex 

176 # - task_types: needs a list 

177 # - device_id: might as well make it a list 

178 # - user_id: might as well make it a list 

179 # - group_id: might as well make it a list 

180 # - start_datetime: single only 

181 # - end_datetime: single only 

182 # - text_contents: might as well make it a list 

183 # - ID numbers: a list, joined with OR. 

184 id = Column( 

185 "id", Integer, 

186 primary_key=True, autoincrement=True, index=True, 

187 comment="Task filter ID (arbitrary integer)" 

188 ) 

189 # Task type filters 

190 task_types = Column( 

191 "task_types", StringListType, 

192 comment="Task filter: task type(s), as CSV list of table names" 

193 ) 

194 tasks_offering_trackers_only = Column( 

195 "tasks_offering_trackers_only", Boolean, 

196 comment="Task filter: restrict to tasks offering trackers only?" 

197 ) 

198 tasks_with_patient_only = Column( 

199 "tasks_with_patient_only", Boolean, 

200 comment="Task filter: restrict to tasks with a patient (non-anonymous " 

201 "tasks) only?" 

202 ) 

203 # Patient-related filters 

204 surname = Column( 

205 "surname", PatientNameColType, 

206 comment="Task filter: surname" 

207 ) 

208 forename = Column( 

209 "forename", PatientNameColType, 

210 comment="Task filter: forename" 

211 ) 

212 dob = Column( 

213 "dob", Date, 

214 comment="Task filter: DOB" 

215 ) # type: Optional[datetime.date] 

216 sex = Column( 

217 "sex", SexColType, 

218 comment="Task filter: sex" 

219 ) 

220 idnum_criteria = Column( # new in v2.0.1 

221 "idnum_criteria", IdNumReferenceListColType, 

222 comment="ID filters as JSON; the ID number definitions are joined " 

223 "with OR" 

224 ) 

225 # Other filters 

226 device_ids = Column( 

227 "device_ids", IntListType, 

228 comment="Task filter: source device ID(s), as CSV" 

229 ) 

230 adding_user_ids = Column( 

231 "user_ids", IntListType, 

232 comment="Task filter: adding (uploading) user ID(s), as CSV" 

233 ) 

234 group_ids = Column( 

235 "group_ids", IntListType, 

236 comment="Task filter: group ID(s), as CSV" 

237 ) 

238 start_datetime = Column( 

239 "start_datetime_iso8601", PendulumDateTimeAsIsoTextColType, 

240 comment="Task filter: start date/time (UTC as ISO8601)" 

241 ) # type: Union[None, Pendulum, datetime.datetime] 

242 end_datetime = Column( 

243 "end_datetime_iso8601", PendulumDateTimeAsIsoTextColType, 

244 comment="Task filter: end date/time (UTC as ISO8601)" 

245 ) # type: Union[None, Pendulum, datetime.datetime] 

246 # Implemented on the Python side for indexed lookup: 

247 text_contents = Column( 

248 "text_contents", StringListType, 

249 comment="Task filter: filter text fields" 

250 ) # task must contain ALL the strings in AT LEAST ONE of its text columns 

251 # Implemented on the Python side for non-indexed lookup: 

252 complete_only = Column( 

253 "complete_only", Boolean, 

254 comment="Task filter: task complete?" 

255 ) 

256 

257 def __init__(self) -> None: 

258 # We need to initialize these explicitly, because if we create an 

259 # instance via "x = TaskFilter()", they will be initialized to None, 

260 # without any recourse to our database to-and-fro conversion code for 

261 # each fieldtype. 

262 # (If we load from a database, things will be fine.) 

263 self.idnum_criteria = [] # type: List[IdNumReference] 

264 self.device_ids = [] # type: List[int] 

265 self.adding_user_ids = [] # type: List[int] 

266 self.group_ids = [] # type: List[int] 

267 self.text_contents = [] # type: List[str] 

268 

269 # ANYTHING YOU ADD BELOW HERE MUST ALSO BE IN init_on_load(). 

270 # Or call it, of course, but we like to keep on the happy side of the 

271 # PyCharm type checker. 

272 

273 # Python-only filtering attributes (i.e. not saved to database) 

274 self.era = None # type: Optional[str] 

275 self.finalized_only = False # used for exports 

276 self.must_have_idnum_type = None # type: Optional[int] 

277 

278 # Other Python-only attributes 

279 self._sort_method = TaskClassSortMethod.NONE 

280 self._task_classes = None # type: Optional[List[Type[Task]]] 

281 

282 @reconstructor 

283 def init_on_load(self): 

284 """ 

285 SQLAlchemy function to recreate after loading from the database. 

286 """ 

287 self.era = None # type: Optional[str] 

288 self.finalized_only = False 

289 self.must_have_idnum_type = None # type: Optional[int] 

290 

291 self._sort_method = TaskClassSortMethod.NONE 

292 self._task_classes = None # type: Optional[List[Type[Task]]] 

293 

294 def __repr__(self) -> str: 

295 return auto_repr(self, with_addr=True) 

296 

297 def set_sort_method(self, sort_method: TaskClassSortMethod) -> None: 

298 """ 

299 Sets the sorting method for task types. 

300 """ 

301 assert sort_method != TaskClassSortMethod.LONGNAME, ( 

302 "If you want to use that sorting method, you need to save a " 

303 "request object, because long task names use translation" 

304 ) 

305 self._sort_method = sort_method 

306 

307 @property 

308 def task_classes(self) -> List[Type[Task]]: 

309 """ 

310 Return a list of task classes permitted by the filter. 

311 

312 Uses caching, since the filter will be called repeatedly. 

313 """ 

314 if self._task_classes is None: 

315 self._task_classes = [] # type: List[Type[Task]] 

316 if self.task_types: 

317 starting_classes = task_classes_from_table_names( 

318 self.task_types) 

319 else: 

320 starting_classes = Task.all_subclasses_by_shortname() 

321 skip_anonymous_tasks = self.skip_anonymous_tasks() 

322 for cls in starting_classes: 

323 if (self.tasks_offering_trackers_only and 

324 not cls.provides_trackers): 

325 # Class doesn't provide trackers; skip 

326 continue 

327 if skip_anonymous_tasks and not cls.has_patient: 

328 # Anonymous task; skip 

329 continue 

330 if self.text_contents and not cls.get_text_filter_columns(): 

331 # Text filter and task has no text columns; skip 

332 continue 

333 self._task_classes.append(cls) 

334 sort_task_classes_in_place(self._task_classes, self._sort_method) 

335 return self._task_classes 

336 

337 def skip_anonymous_tasks(self) -> bool: 

338 """ 

339 Should we skip anonymous tasks? 

340 """ 

341 return self.tasks_with_patient_only or self.any_patient_filtering() 

342 

343 def offers_all_task_types(self) -> bool: 

344 """ 

345 Does this filter offer every single task class? Used for efficiency 

346 when using indexes. (Since ignored.) 

347 """ 

348 if self.tasks_offering_trackers_only: 

349 return False 

350 if self.skip_anonymous_tasks(): 

351 return False 

352 if not self.task_types: 

353 return True 

354 return set(self.task_classes) == set(Task.all_subclasses_by_shortname) 

355 

356 def offers_all_non_anonymous_task_types(self) -> bool: 

357 """ 

358 Does this filter offer every single non-anonymous task class? Used for 

359 efficiency when using indexes. 

360 """ 

361 offered_task_classes = self.task_classes 

362 for taskclass in Task.all_subclasses_by_shortname(): 

363 if taskclass.is_anonymous: 

364 continue 

365 if taskclass not in offered_task_classes: 

366 return False 

367 return True 

368 

369 @property 

370 def task_tablename_list(self) -> List[str]: 

371 """ 

372 Returns the base table names for all task types permitted by the 

373 filter. 

374 """ 

375 return [cls.__tablename__ for cls in self.task_classes] 

376 

377 def any_patient_filtering(self) -> bool: 

378 """ 

379 Is some sort of patient filtering being applied? 

380 """ 

381 return ( 

382 bool(self.surname) or 

383 bool(self.forename) or 

384 (self.dob is not None) or 

385 bool(self.sex) or 

386 bool(self.idnum_criteria) 

387 ) 

388 

389 def any_specific_patient_filtering(self) -> bool: 

390 """ 

391 Are there filters that would restrict to one or a few patients? 

392 

393 (Differs from :func:`any_patient_filtering` with respect to sex.) 

394 """ 

395 return ( 

396 bool(self.surname) or 

397 bool(self.forename) or 

398 self.dob is not None or 

399 bool(self.idnum_criteria) 

400 ) 

401 

402 def get_only_iddef(self) -> Optional["IdNumReference"]: 

403 """ 

404 If a single ID number type/value restriction is being applied, return 

405 it, as an 

406 :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`. 

407 Otherwise, return ``None``. 

408 """ 

409 if len(self.idnum_criteria) != 1: 

410 return None 

411 return self.idnum_criteria[0] 

412 

413 def get_group_names(self, req: "CamcopsRequest") -> List[str]: 

414 """ 

415 Get the names of any groups to which we are restricting. 

416 """ 

417 groups = ( 

418 req.dbsession.query(Group) 

419 .filter(Group.id.in_(self.group_ids)) 

420 .all() 

421 ) # type: List[Group] 

422 return [g.name if g and g.name else "" for g in groups] 

423 

424 def get_user_names(self, req: "CamcopsRequest") -> List[str]: 

425 """ 

426 Get the usernames of any uploading users to which we are restricting. 

427 """ 

428 users = ( 

429 req.dbsession.query(User) 

430 .filter(User.id.in_(self.adding_user_ids)) 

431 .all() 

432 ) # type: List[User] 

433 return [u.username if u and u.username else "" for u in users] 

434 

435 def get_device_names(self, req: "CamcopsRequest") -> List[str]: 

436 """ 

437 Get the names of any devices to which we are restricting. 

438 """ 

439 devices = ( 

440 req.dbsession.query(Device) 

441 .filter(Device.id.in_(self.device_ids)) 

442 .all() 

443 ) # type: List[Device] 

444 return [d.name if d and d.name else "" for d in devices] 

445 

446 def clear(self) -> None: 

447 """ 

448 Clear all parts of the filter. 

449 """ 

450 self.task_types = [] # type: List[str] 

451 

452 self.surname = None 

453 self.forename = None 

454 self.dob = None # type: Optional[datetime.date] 

455 self.sex = None 

456 self.idnum_criteria = [] # type: List[IdNumReference] 

457 

458 self.device_ids = [] # type: List[int] 

459 self.adding_user_ids = [] # type: List[int] 

460 self.group_ids = [] # type: List[int] 

461 self.start_datetime = None # type: Union[None, Pendulum, datetime.datetime] # noqa 

462 self.end_datetime = None # type: Union[None, Pendulum, datetime.datetime] # noqa 

463 self.text_contents = [] # type: List[str] 

464 

465 self.complete_only = None # type: Optional[bool] 

466 

467 def dates_inconsistent(self) -> bool: 

468 """ 

469 Are inconsistent dates specified, such that no tasks should be 

470 returned? 

471 """ 

472 return (self.start_datetime and self.end_datetime and 

473 self.end_datetime < self.start_datetime) 

474 

475 def filter_query_by_patient(self, q: Query, 

476 via_index: bool) -> Query: 

477 """ 

478 Restricts an query that has *already been joined* to the 

479 :class:`camcops_server.cc_modules.cc_patient.Patient` class, according 

480 to the patient filtering criteria. 

481 

482 Args: 

483 q: the starting SQLAlchemy ORM Query 

484 via_index: 

485 If ``True``, the query relates to a 

486 :class:`camcops_server.cc_modules.cc_taskindex.TaskIndexEntry` 

487 and we should restrict it according to the 

488 :class:`camcops_server.cc_modules.cc_taskindex.PatientIdNumIndexEntry` 

489 class. If ``False``, the query relates to a 

490 :class:`camcops_server.cc_modules.cc_taskindex.Task` and we 

491 should restrict according to 

492 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`. 

493 

494 Returns: 

495 a revised Query 

496 

497 """ 

498 if self.surname: 

499 q = q.filter(func.upper(Patient.surname) == 

500 self.surname.upper()) 

501 if self.forename: 

502 q = q.filter(func.upper(Patient.forename) == 

503 self.forename.upper()) 

504 if self.dob is not None: 

505 q = q.filter(Patient.dob == self.dob) 

506 if self.sex: 

507 q = q.filter(func.upper(Patient.sex) == self.sex.upper()) 

508 

509 if self.idnum_criteria: 

510 id_filter_parts = [] # type: List[ColumnElement] 

511 if via_index: 

512 q = q.join(PatientIdNumIndexEntry) 

513 # "Specify possible ID number values" 

514 for iddef in self.idnum_criteria: 

515 id_filter_parts.append(and_( 

516 PatientIdNumIndexEntry.which_idnum == iddef.which_idnum, # noqa 

517 PatientIdNumIndexEntry.idnum_value == iddef.idnum_value 

518 )) 

519 # Use OR (disjunction) of the specified values: 

520 q = q.filter(or_(*id_filter_parts)) 

521 # "Must have a value for a given ID number type" 

522 if self.must_have_idnum_type: 

523 # noinspection PyComparisonWithNone,PyPep8 

524 q = q.filter(and_( 

525 PatientIdNumIndexEntry.which_idnum == self.must_have_idnum_type, # noqa 

526 PatientIdNumIndexEntry.idnum_value != None # noqa: E711 

527 )) 

528 else: 

529 # q = q.join(PatientIdNum) # fails 

530 q = q.join(Patient.idnums) 

531 # "Specify possible ID number values" 

532 for iddef in self.idnum_criteria: 

533 id_filter_parts.append(and_( 

534 PatientIdNum.which_idnum == iddef.which_idnum, 

535 PatientIdNum.idnum_value == iddef.idnum_value 

536 )) 

537 # Use OR (disjunction) of the specified values: 

538 q = q.filter(or_(*id_filter_parts)) 

539 # "Must have a value for a given ID number type" 

540 if self.must_have_idnum_type: 

541 # noinspection PyComparisonWithNone,PyPep8 

542 q = q.filter(and_( 

543 PatientIdNum.which_idnum == self.must_have_idnum_type, 

544 PatientIdNum.idnum_value != None # noqa: E711 

545 )) 

546 

547 return q 

548 

549 @property 

550 def start_datetime_utc(self) -> Optional[Pendulum]: 

551 if not self.start_datetime: 

552 return None 

553 return convert_datetime_to_utc(self.start_datetime) 

554 

555 @property 

556 def end_datetime_utc(self) -> Optional[Pendulum]: 

557 if not self.end_datetime: 

558 return None 

559 return convert_datetime_to_utc(self.end_datetime) 

560 

561 

562def encode_task_filter(taskfilter: TaskFilter) -> Dict: 

563 return { 

564 "task_types": taskfilter.task_types, 

565 "group_ids": taskfilter.group_ids, 

566 } 

567 

568 

569# noinspection PyUnusedLocal 

570def decode_task_filter(d: Dict, cls: Type) -> TaskFilter: 

571 taskfilter = TaskFilter() 

572 taskfilter.task_types = d["task_types"] 

573 taskfilter.group_ids = d["group_ids"] 

574 

575 return taskfilter 

576 

577 

578register_class_for_json( 

579 cls=TaskFilter, 

580 obj_to_dict_fn=encode_task_filter, 

581 dict_to_obj_fn=decode_task_filter 

582)