Coverage for cc_modules/cc_taskcollection.py: 45%

376 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:51 +0100

1""" 

2camcops_server/cc_modules/cc_taskcollection.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Classes to fetch tasks from the database as efficiently as possible.** 

27 

28""" 

29 

30from collections import OrderedDict 

31import datetime 

32from enum import Enum 

33import logging 

34from threading import Thread 

35from typing import ( 

36 Any, 

37 Dict, 

38 Generator, 

39 List, 

40 Optional, 

41 Tuple, 

42 Type, 

43 TYPE_CHECKING, 

44 Union, 

45) 

46 

47from cardinal_pythonlib.json.serialize import ( 

48 register_class_for_json, 

49 register_enum_for_json, 

50) 

51from cardinal_pythonlib.logs import BraceStyleAdapter 

52from cardinal_pythonlib.reprfunc import auto_repr, auto_str 

53from cardinal_pythonlib.sort import MINTYPE_SINGLETON, MinType 

54from kombu.serialization import dumps, loads 

55from pendulum import DateTime as Pendulum 

56from sqlalchemy.orm import Query 

57from sqlalchemy.orm.session import Session as SqlASession 

58from sqlalchemy.sql.functions import func 

59from sqlalchemy.sql.expression import and_, exists, or_ 

60 

61from camcops_server.cc_modules.cc_constants import ERA_NOW 

62from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

63from camcops_server.cc_modules.cc_task import ( 

64 tablename_to_task_class_dict, 

65 Task, 

66) 

67from camcops_server.cc_modules.cc_taskfactory import ( 

68 task_query_restricted_to_permitted_users, 

69) 

70from camcops_server.cc_modules.cc_taskfilter import TaskFilter 

71from camcops_server.cc_modules.cc_taskindex import TaskIndexEntry 

72 

73if TYPE_CHECKING: 

74 from sqlalchemy.sql.elements import ClauseElement, ColumnElement 

75 from camcops_server.cc_modules.cc_request import CamcopsRequest 

76 

77log = BraceStyleAdapter(logging.getLogger(__name__)) 

78 

79 

80# ============================================================================= 

81# Debugging options 

82# ============================================================================= 

83 

84DEBUG_QUERY_TIMING = False 

85 

86if DEBUG_QUERY_TIMING: 

87 log.warning("Debugging options enabled!") 

88 

89 

90# ============================================================================= 

91# Sorting helpers 

92# ============================================================================= 

93 

94 

95def task_when_created_sorter( 

96 task: Task, 

97) -> Union[Tuple[Pendulum, datetime.datetime], MinType]: 

98 """ 

99 Function to sort tasks by their creation date/time (with upload date/time 

100 as a tiebreak for consistent ordering). 

101 """ 

102 # For sorting of tasks 

103 created = task.when_created 

104 # noinspection PyProtectedMember 

105 uploaded = task._when_added_batch_utc 

106 return MINTYPE_SINGLETON if created is None else (created, uploaded) 

107 

108 

109@register_enum_for_json 

110class TaskSortMethod(Enum): 

111 """ 

112 Enum representing ways to sort tasks. 

113 """ 

114 

115 NONE = 0 

116 CREATION_DATE_ASC = 1 

117 CREATION_DATE_DESC = 2 

118 

119 

120def sort_tasks_in_place( 

121 tasklist: List[Task], sortmethod: TaskSortMethod 

122) -> None: 

123 """ 

124 Sort a list of tasks, in place, according to ``sortmethod``. 

125 

126 Args: 

127 tasklist: the list of tasks 

128 sortmethod: a :class:`TaskSortMethod` enum 

129 """ 

130 # Sort? 

131 if sortmethod == TaskSortMethod.CREATION_DATE_ASC: 

132 tasklist.sort(key=task_when_created_sorter) 

133 elif sortmethod == TaskSortMethod.CREATION_DATE_DESC: 

134 tasklist.sort(key=task_when_created_sorter, reverse=True) 

135 

136 

137# ============================================================================= 

138# Parallel fetch helper 

139# ============================================================================= 

140# - Why consider a parallel fetch? 

141# Because a typical fetch might involve 27ms per query (as seen by Python; 

142# less as seen by MySQL) but about 100 queries, for a not-very-large 

143# database. 

144# - Initially UNSUCCESSFUL: even after tweaking pool_size=0 in create_engine() 

145# to get round the SQLAlchemy error "QueuePool limit of size 5 overflow 10 

146# reached", in the parallel code, a great many queries are launched, but then 

147# something goes wrong and others are started but then block -- for ages -- 

148# waiting for a spare database connection, or something. 

149# - Fixed that: I was not explicitly closing the sessions. 

150# - But then a major conceptual problem: anything to be lazy-loaded (e.g. 

151# patient, but also patient ID, special note, BLOB...) will give this sort of 

152# error: "DetachedInstanceError: Parent instance <Phq9 at 0x7fe6cce2d278> is 

153# not bound to a Session; lazy load operation of attribute 'patient' cannot 

154# proceed" -- for obvious reasons. And some of those operations are only 

155# required on the final paginated task set, which requires aggregation across 

156# all tasks. 

157# 

158# HOWEVER, the query time per table drops from ~27ms to 4-8ms if we disable 

159# eager loading (lazy="joined") of patients from tasks. 

160 

161 

162class FetchThread(Thread): 

163 """ 

164 Thread to fetch tasks in parallel. 

165 

166 CURRENTLY UNUSED. 

167 """ 

168 

169 def __init__( 

170 self, 

171 req: "CamcopsRequest", 

172 task_class: Type[Task], 

173 factory: "TaskCollection", 

174 **kwargs: Any 

175 ) -> None: 

176 self.req = req 

177 self.task_class = task_class 

178 self.factory = factory 

179 self.error = False 

180 name = task_class.__tablename__ 

181 super().__init__(name=name, target=None, **kwargs) 

182 

183 def run(self) -> None: 

184 log.debug("Thread starting") 

185 dbsession = self.req.get_bare_dbsession() 

186 # noinspection PyBroadException 

187 try: 

188 # noinspection PyProtectedMember 

189 q = self.factory._make_query(dbsession, self.task_class) 

190 if q: 

191 tasks = q.all() # type: List[Task] 

192 # https://stackoverflow.com/questions/6319207/are-lists-thread-safe # noqa 

193 # https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary # noqa 

194 # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm # noqa 

195 # noinspection PyProtectedMember 

196 self.factory._tasks_by_class[self.task_class] = tasks 

197 log.debug("Thread finishing with results") 

198 else: 

199 log.debug("Thread finishing without results") 

200 except Exception: 

201 self.error = True 

202 log.error("Thread error") 

203 dbsession.close() 

204 

205 

206# ============================================================================= 

207# Make a set of tasks, deferring work until things are needed 

208# ============================================================================= 

209 

210 

211class TaskCollection(object): 

212 """ 

213 Represent a potential or instantiated call to fetch tasks from the 

214 database. 

215 

216 The caller may want them in a giant list (e.g. task viewer, CTVs), or split 

217 by task class (e.g. trackers). 

218 """ 

219 

220 def __init__( 

221 self, 

222 req: Optional["CamcopsRequest"], 

223 taskfilter: TaskFilter = None, 

224 as_dump: bool = False, 

225 sort_method_by_class: TaskSortMethod = TaskSortMethod.NONE, 

226 sort_method_global: TaskSortMethod = TaskSortMethod.NONE, 

227 current_only: bool = True, 

228 via_index: bool = True, 

229 export_recipient: "ExportRecipient" = None, 

230 ) -> None: 

231 """ 

232 Args: 

233 req: 

234 The 

235 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`. 

236 ``None`` should only be used as a parameter when serializing 

237 a :class:`TaskCollection` to the back-end. 

238 taskfilter: 

239 A :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter` 

240 object that contains any restrictions we may want to apply. 

241 Must be supplied unless supplying ``export_recipient`` (in 

242 which case, must not be supplied). 

243 as_dump: 

244 Use the "dump" permissions rather than the "view" permissions? 

245 sort_method_by_class: 

246 How should we sort tasks within each task class? 

247 sort_method_global: 

248 How should we sort tasks overall (across all task types)? 

249 current_only: 

250 Restrict to ``_current`` tasks only? 

251 via_index: 

252 Use the server's index (faster)? (Not possible with 

253 ``current_only=False``.) 

254 export_recipient: 

255 A :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

256 """ # noqa 

257 if via_index and not current_only: 

258 log.warning("Can't use index for non-current tasks") 

259 via_index = False 

260 

261 self._req = req 

262 self._filter = taskfilter 

263 self._as_dump = as_dump 

264 self._sort_method_by_class = sort_method_by_class 

265 self._sort_method_global = sort_method_global 

266 self._current_only = current_only 

267 self._via_index = via_index 

268 self.export_recipient = export_recipient 

269 

270 if export_recipient: 

271 # We create a new filter to reflect the export recipient. 

272 assert ( 

273 self._filter is None 

274 ), "Can't supply taskfilter if you supply export_recipient" 

275 # We can do lots of what we need with a TaskFilter(). 

276 self._filter = TaskFilter() 

277 if not export_recipient.all_groups: 

278 self._filter.group_ids = export_recipient.group_ids 

279 self._filter.task_types = export_recipient.tasks 

280 self._filter.start_datetime = export_recipient.start_datetime_utc # type: ignore[assignment] # noqa: E501 

281 self._filter.end_datetime = export_recipient.end_datetime_utc # type: ignore[assignment] # noqa: E501 

282 self._filter.finalized_only = export_recipient.finalized_only 

283 self._filter.tasks_with_patient_only = ( 

284 not export_recipient.anonymous_ok() 

285 ) 

286 self._filter.must_have_idnum_type = export_recipient.primary_idnum 

287 else: 

288 assert ( 

289 self._filter 

290 ), "Must supply taskfilter unless you supply export_recipient" 

291 

292 self._tasks_by_class = ( 

293 OrderedDict() 

294 ) # type: Dict[Type[Task], List[Task]] 

295 self._all_tasks = None # type: Optional[List[Task]] 

296 self._all_indexes = ( 

297 None 

298 ) # type: Optional[Union[List[TaskIndexEntry], Query]] 

299 

300 def __repr__(self) -> str: 

301 return auto_repr(self) 

302 

303 def __str__(self) -> str: 

304 return auto_str(self) 

305 

306 # ========================================================================= 

307 # Interface to read 

308 # ========================================================================= 

309 

310 @property 

311 def req(self) -> "CamcopsRequest": 

312 """ 

313 Returns the associated request, or raises :exc:`AssertionError` if it's 

314 not been set. 

315 """ 

316 assert ( 

317 self._req is not None 

318 ), "Must initialize with a request or call set_request() first" 

319 return self._req 

320 

321 def set_request(self, req: "CamcopsRequest") -> None: 

322 """ 

323 Sets the request object manually. Used by Celery back-end tasks. 

324 

325 Args: 

326 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

327 """ 

328 self._req = req 

329 

330 def task_classes(self) -> List[Type[Task]]: 

331 """ 

332 Return a list of task classes that we want. 

333 """ 

334 return self._filter.task_classes 

335 

336 def tasks_for_task_class(self, task_class: Type[Task]) -> List[Task]: 

337 """ 

338 Returns all appropriate task instances for a specific task type. 

339 """ 

340 if self._via_index: 

341 self._ensure_everything_fetched_via_index() 

342 else: 

343 self._fetch_task_class(task_class) 

344 tasklist = self._tasks_by_class.get(task_class, []) 

345 return tasklist 

346 

347 @property 

348 def all_tasks(self) -> List[Task]: 

349 """ 

350 Returns a list of all appropriate task instances. 

351 """ 

352 if self._all_tasks is None: 

353 if self._via_index: 

354 self._ensure_everything_fetched_via_index() 

355 else: 

356 self._fetch_all_tasks_without_index() 

357 return self._all_tasks 

358 

359 @property 

360 def all_tasks_or_indexes_or_query( 

361 self, 

362 ) -> Union[List[Task], List[TaskIndexEntry], Query]: 

363 """ 

364 Returns a list of all appropriate task instances, or index entries, or 

365 a query returning them. 

366 

367 - Returning a list of tasks is fine, but the results of this function 

368 may be paginated (e.g. in the main task view), so the end result may 

369 be that e.g. 20,000 tasks are fetched and 20 are shown. 

370 - More efficient is to fetch 20,000 indexes from the single index 

371 table, and fetch only the 20 tasks we need. 

372 - More efficient still is to fetch the 20 indexes we need, and then 

373 their task. 

374 """ 

375 if not self._via_index: 

376 return self.all_tasks 

377 

378 self._build_index_query() # ensure self._all_indexes is set 

379 

380 if self._all_tasks is not None: 

381 # The tasks themselves have been fetched. 

382 return self._all_tasks 

383 

384 return self._all_indexes # indexes or a query to fetch them 

385 

386 # def forget_task_class(self, task_class: Type[Task]) -> None: 

387 # """ 

388 # Ditch results for a specific task class (for memory efficiency). 

389 # """ 

390 # self._tasks_by_class.pop(task_class, None) 

391 # # The "None" option prevents it from raising KeyError if the key 

392 # # doesn't exist. 

393 # # https://stackoverflow.com/questions/11277432/how-to-remove-a-key-from-a-python-dictionary # noqa 

394 

395 def gen_all_tasks_or_indexes( 

396 self, 

397 ) -> Generator[Union[Task, TaskIndexEntry], None, None]: 

398 """ 

399 Generates tasks or index entries. 

400 """ 

401 tasks_or_indexes_or_query = self.all_tasks_or_indexes_or_query 

402 if isinstance(tasks_or_indexes_or_query, Query): 

403 for item in tasks_or_indexes_or_query.all(): 

404 yield item 

405 else: 

406 for item in tasks_or_indexes_or_query: 

407 yield item 

408 

409 def gen_tasks_by_class(self) -> Generator[Task, None, None]: 

410 """ 

411 Generates all tasks, class-wise. 

412 """ 

413 for cls in self.task_classes(): 

414 for task in self.tasks_for_task_class(cls): 

415 yield task 

416 

417 def gen_tasks_in_global_order(self) -> Generator[Task, None, None]: 

418 """ 

419 Generates all tasks, in the global order. 

420 """ 

421 for task in self.all_tasks: 

422 yield task 

423 

424 @property 

425 def dbsession(self) -> SqlASession: 

426 """ 

427 Returns the request's database session. 

428 """ 

429 return self.req.dbsession 

430 

431 # ========================================================================= 

432 # Internals: fetching Task objects 

433 # ========================================================================= 

434 

435 def _fetch_all_tasks_without_index(self, parallel: bool = False) -> None: 

436 """ 

437 Fetch all tasks from the database. 

438 """ 

439 

440 # AVOID parallel=True; see notes above. 

441 if DEBUG_QUERY_TIMING: 

442 start_time = Pendulum.now() 

443 

444 if parallel: 

445 # Deprecated parallel fetch 

446 threads = [] # type: List[FetchThread] 

447 for task_class in self._filter.task_classes: 

448 thread = FetchThread(self.req, task_class, self) 

449 thread.start() 

450 threads.append(thread) 

451 for thread in threads: 

452 thread.join() 

453 if thread.error: 

454 raise ValueError("Multithreaded fetch failed") 

455 

456 else: 

457 # Fetch all tasks, classwise. 

458 for task_class in self._filter.task_classes: 

459 self._fetch_task_class(task_class) 

460 

461 if DEBUG_QUERY_TIMING: 

462 end_time = Pendulum.now() 

463 # noinspection PyUnboundLocalVariable 

464 time_taken = end_time - start_time 

465 log.info("_fetch_all_tasks took {}", time_taken) 

466 

467 # Build our joint task list 

468 self._all_tasks = [] # type: ignore[no-redef] # type: List[Task] 

469 for single_task_list in self._tasks_by_class.values(): 

470 self._all_tasks += single_task_list 

471 sort_tasks_in_place(self._all_tasks, self._sort_method_global) 

472 

473 def _fetch_task_class(self, task_class: Type[Task]) -> None: 

474 """ 

475 Fetch tasks from the database for one task type. 

476 """ 

477 if task_class in self._tasks_by_class: 

478 return # already fetched 

479 q = self._serial_query(task_class) 

480 if q is None: 

481 newtasks = [] # type: List[Task] 

482 else: 

483 newtasks = q.all() # type: ignore[no-redef] # type: List[Task] 

484 # Apply Python-side filters? 

485 newtasks = self._filter_through_python(newtasks) 

486 sort_tasks_in_place(newtasks, self._sort_method_by_class) 

487 self._tasks_by_class[task_class] = newtasks 

488 

489 def _serial_query(self, task_class: Type[Task]) -> Optional[Query]: 

490 """ 

491 Make and return an SQLAlchemy ORM query for a specific task class. 

492 

493 Returns ``None`` if no tasks would match our criteria. 

494 """ 

495 dbsession = self.req.dbsession 

496 return self._make_query(dbsession, task_class) 

497 

498 def _make_query( 

499 self, dbsession: SqlASession, task_class: Type[Task] 

500 ) -> Optional[Query]: 

501 """ 

502 Make and return an SQLAlchemy ORM query for a specific task class. 

503 

504 Returns ``None`` if no tasks would match our criteria. 

505 """ 

506 q = dbsession.query(task_class) 

507 

508 # Restrict to what the web front end will supply 

509 # noinspection PyProtectedMember 

510 if self._current_only: 

511 # noinspection PyProtectedMember 

512 q = q.filter(task_class._current == True) # noqa: E712 

513 

514 # Restrict to what is PERMITTED 

515 q = task_query_restricted_to_permitted_users( 

516 self.req, q, task_class, as_dump=self._as_dump 

517 ) 

518 

519 # Restrict to what is DESIRED 

520 if q: 

521 q = self._task_query_restricted_by_filter(q, task_class) 

522 if q and self.export_recipient: 

523 q = self._task_query_restricted_by_export_recipient(q, task_class) 

524 

525 return q 

526 

527 def _task_query_restricted_by_filter( 

528 self, q: Query, cls: Type[Task] 

529 ) -> Optional[Query]: 

530 """ 

531 Restricts an SQLAlchemy ORM query for a given task class to those 

532 tasks that our filter permits. 

533 

534 THIS IS A KEY SECURITY FUNCTION, since it implements some permissions 

535 that relate to viewing tasks when unfiltered. 

536 

537 Args: 

538 q: the starting SQLAlchemy ORM Query 

539 cls: the task class 

540 

541 Returns: 

542 the original query, a modified query, or ``None`` if no tasks 

543 would pass the filter 

544 

545 """ 

546 tf = self._filter # task filter 

547 user = self.req.user 

548 

549 if tf.group_ids: 

550 permitted_group_ids = tf.group_ids.copy() 

551 else: 

552 permitted_group_ids = None # unrestricted 

553 

554 if tf.dates_inconsistent(): 

555 return None 

556 

557 if cls not in tf.task_classes: 

558 # We don't want this task 

559 return None 

560 

561 if not cls.is_anonymous: 

562 # Not anonymous. 

563 if not tf.any_specific_patient_filtering(): 

564 # No patient filtering. Permissions depend on user settings. 

565 if user.may_view_all_patients_when_unfiltered: 

566 # May see everything. No restrictions. 

567 pass 

568 elif user.may_view_no_patients_when_unfiltered: 

569 # Can't see patient data from any group. 

570 # (a) User not permitted to view any patients when 

571 # unfiltered, and (b) not filtered to a level that would 

572 # reasonably restrict to one or a small number of 

573 # patients. Skip the task class. 

574 return None 

575 else: 

576 # May see patient data from some, but not all, groups. 

577 liberal_group_ids = ( 

578 user.group_ids_nonsuperuser_may_see_when_unfiltered() 

579 ) 

580 if not permitted_group_ids: # was unrestricted 

581 permitted_group_ids = liberal_group_ids 

582 else: # was restricted; restrict further 

583 permitted_group_ids = [ 

584 gid 

585 for gid in permitted_group_ids 

586 if gid in liberal_group_ids 

587 ] 

588 if not permitted_group_ids: 

589 return None # down to zero; no point continuing 

590 

591 # Patient filtering 

592 if tf.any_patient_filtering(): 

593 # q = q.join(Patient) # fails 

594 q = q.join( 

595 cls.patient # type: ignore[arg-type] 

596 ) # use explicitly configured relationship 

597 q = tf.filter_query_by_patient(q, via_index=False) 

598 

599 # Patient-independent filtering 

600 

601 if tf.device_ids: 

602 # noinspection PyProtectedMember 

603 q = q.filter(cls._device_id.in_(tf.device_ids)) 

604 

605 if tf.era: 

606 # noinspection PyProtectedMember 

607 q = q.filter(cls._era == tf.era) 

608 if tf.finalized_only: 

609 q = q.filter(cls._era != ERA_NOW) 

610 

611 if tf.adding_user_ids: 

612 # noinspection PyProtectedMember 

613 q = q.filter(cls._adding_user_id.in_(tf.adding_user_ids)) 

614 

615 if permitted_group_ids: 

616 # noinspection PyProtectedMember 

617 q = q.filter(cls._group_id.in_(permitted_group_ids)) 

618 

619 if tf.start_datetime is not None: 

620 q = q.filter(cls.when_created >= tf.start_datetime) 

621 if tf.end_datetime is not None: 

622 q = q.filter(cls.when_created < tf.end_datetime) 

623 

624 q = self._filter_query_for_text_contents(q, cls) 

625 

626 return q 

627 

628 def _task_query_restricted_by_export_recipient( 

629 self, q: Query, cls: Type[Task] 

630 ) -> Optional[Query]: 

631 """ 

632 For exports. 

633 

634 Filters via our 

635 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`, 

636 except for the bits already implemented via our 

637 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`. 

638 

639 The main job here is for incremental exports: to find tasks that have 

640 not yet been exported. We look for any tasks not yet exported to a 

641 recipient of the same name (regardless of ``ExportRecipient.id``, which 

642 changes when the export recipient is reconfigured). 

643 

644 Compare :meth:`_index_query_restricted_by_export_recipient`. 

645 

646 Args: 

647 q: the starting SQLAlchemy ORM Query 

648 cls: the task class 

649 

650 Returns: 

651 the original query, a modified query, or ``None`` if no tasks 

652 would pass the filter 

653 """ 

654 from camcops_server.cc_modules.cc_exportmodels import ( 

655 ExportedTask, 

656 ) # delayed import 

657 

658 r = self.export_recipient 

659 if not r.is_incremental(): 

660 # Full database export; no restrictions 

661 return q 

662 # Otherwise, restrict to tasks not yet sent to this recipient. 

663 # noinspection PyUnresolvedReferences 

664 q = q.filter( 

665 # "There is not a successful export record for this task/recipient" 

666 ~exists() 

667 .select_from( 

668 ExportedTask.__table__.join( 

669 ExportRecipient.__table__, 

670 ExportedTask.recipient_id == ExportRecipient.id, 

671 ) 

672 ) 

673 .where( 

674 and_( 

675 ExportRecipient.recipient_name == r.recipient_name, 

676 ExportedTask.basetable == cls.__tablename__, 

677 ExportedTask.task_server_pk == cls._pk, 

678 ExportedTask.success == True, # noqa: E712 

679 ExportedTask.cancelled == False, # noqa: E712 

680 ) 

681 ) 

682 ) 

683 return q 

684 

685 def _filter_through_python(self, tasks: List[Task]) -> List[Task]: 

686 """ 

687 Returns those tasks in the list provided that pass any Python-only 

688 aspects of our filter (those parts not easily calculable via SQL). 

689 

690 This applies to the "direct" (and not "via index") routes only. With 

691 the index, we can do everything via SQL. 

692 """ 

693 assert not self._via_index 

694 if not self._has_python_parts_to_filter(): 

695 return tasks 

696 return [ 

697 t for t in tasks if self._task_matches_python_parts_of_filter(t) 

698 ] 

699 

700 def _has_python_parts_to_filter(self) -> bool: 

701 """ 

702 Does the filter have aspects to it that require some Python thought, 

703 not just a database query? 

704 

705 Only applicable to the direct (not "via index") route. 

706 """ 

707 assert not self._via_index 

708 return self._filter.complete_only 

709 

710 def _task_matches_python_parts_of_filter(self, task: Task) -> bool: 

711 """ 

712 Does the task pass the Python parts of the filter? 

713 

714 Only applicable to the direct (not "via index") route. 

715 """ 

716 assert not self._via_index 

717 

718 # "Is task complete" filter 

719 if self._filter.complete_only: 

720 if not task.is_complete(): 

721 return False 

722 

723 return True 

724 

725 # ========================================================================= 

726 # Shared between Task and TaskIndexEntry methods 

727 # ========================================================================= 

728 

729 def _filter_query_for_text_contents( 

730 self, q: Query, taskclass: Type[Task] 

731 ) -> Optional[Query]: 

732 """ 

733 Returns the query, filtered for the "text contents" filter. 

734 

735 Args: 

736 q: the starting SQLAlchemy ORM Query 

737 taskclass: the task class 

738 

739 Returns: 

740 a Query, potentially modified. 

741 """ 

742 tf = self._filter # task filter 

743 

744 if not tf.text_contents: 

745 return q # unmodified 

746 

747 # task must contain ALL the strings in AT LEAST ONE text column 

748 textcols = taskclass.get_text_filter_columns() 

749 if not textcols: 

750 # Text filtering requested, but there are no text columns, so 

751 # by definition the filter must fail. 

752 return None 

753 clauses_over_text_phrases = [] # type: List[ColumnElement] 

754 # ... each e.g. "col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%'" # noqa 

755 for textfilter in tf.text_contents: 

756 tf_lower = textfilter.lower() 

757 clauses_over_columns = [] # type: List[ColumnElement] 

758 # ... each e.g. "col1 LIKE '%paracetamol%'" 

759 for textcol in textcols: 

760 # Case-insensitive comparison: 

761 # https://groups.google.com/forum/#!topic/sqlalchemy/331XoToT4lk 

762 # https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/StringComparisonFilter # noqa 

763 clauses_over_columns.append( 

764 func.lower(textcol).contains(tf_lower, autoescape=True) 

765 ) 

766 clauses_over_text_phrases.append(or_(*clauses_over_columns)) 

767 return q.filter(and_(*clauses_over_text_phrases)) 

768 # ... thus, e.g. 

769 # "(col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%') AND 

770 # (col1 LIKE '%overdose%' OR col2 LIKE '%overdose%') 

771 

772 # ========================================================================= 

773 # Internals: fetching TaskIndexEntry objects 

774 # ========================================================================= 

775 

776 def _ensure_everything_fetched_via_index(self) -> None: 

777 """ 

778 Ensure we have all our tasks loaded, using the index. 

779 """ 

780 self._build_index_query() 

781 self._fetch_tasks_from_indexes() 

782 

783 def _build_index_query(self) -> None: 

784 """ 

785 Creates a Query in :attr:`_all_indexes` that will fetch task indexes. 

786 If the task filtering requires the tasks to be fetched (i.e. text 

787 contents), fetch the actual tasks too (and filter them). 

788 """ 

789 if self._all_indexes is not None: 

790 return 

791 self._all_indexes = self._make_index_query() 

792 if self._filter.text_contents: 

793 self._fetch_tasks_from_indexes() 

794 

795 def _fetch_tasks_from_indexes(self) -> None: 

796 """ 

797 Takes the query that has already been stored in :attr:`_all_indexes`, 

798 and populate the task attributes, :attr:`_all_tasks` and 

799 :attr:`_tasks_by_class`. 

800 """ 

801 if self._all_tasks is not None: 

802 return 

803 assert self._all_indexes is not None 

804 

805 d = tablename_to_task_class_dict() 

806 dbsession = self.req.dbsession 

807 self._all_tasks = [] # type: ignore[no-redef] # type: List[Task] 

808 

809 # Fetch indexes 

810 if isinstance(self._all_indexes, Query): 

811 # Query built, but indexes not yet fetched. 

812 # Replace the query with actual indexes 

813 self._all_indexes = ( # type: ignore[no-redef] 

814 self._all_indexes.all() 

815 ) # type: List[TaskIndexEntry] 

816 indexes = self._all_indexes 

817 

818 # Fetch tasks 

819 tablenames = set(index.task_table_name for index in indexes) 

820 for tablename in tablenames: 

821 # We do this by task class, so we can execute a single query per 

822 # task type (rather than per task). 

823 try: 

824 taskclass = d[tablename] 

825 except KeyError: 

826 log.warning("Bad tablename in index: {!r}", tablename) 

827 continue 

828 tasklist = self._tasks_by_class.setdefault(taskclass, []) 

829 task_pks = [i.task_pk for i in indexes if i.tablename == tablename] 

830 # noinspection PyProtectedMember 

831 qtask = dbsession.query(taskclass).filter( 

832 taskclass._pk.in_(task_pks) 

833 ) 

834 qtask = self._filter_query_for_text_contents(qtask, taskclass) 

835 tasks = qtask.all() # type: List[Task] 

836 for task in tasks: 

837 tasklist.append(task) 

838 self._all_tasks.append(task) 

839 

840 # Sort tasks 

841 for tasklist in self._tasks_by_class.values(): 

842 sort_tasks_in_place(tasklist, self._sort_method_by_class) 

843 sort_tasks_in_place(self._all_tasks, self._sort_method_global) 

844 

845 def _make_index_query(self) -> Optional[Query]: 

846 """ 

847 Make and return an SQLAlchemy ORM query to retrieve indexes. 

848 

849 Returns ``None`` if no tasks would match our criteria. 

850 """ 

851 dbsession = self.req.dbsession 

852 q = dbsession.query(TaskIndexEntry) 

853 

854 # Restrict to what the web front end will supply 

855 assert self._current_only, "_current_only must be true to use index" 

856 

857 # Restrict to what is PERMITTED 

858 if not self.export_recipient: 

859 q = task_query_restricted_to_permitted_users( 

860 self.req, q, TaskIndexEntry, as_dump=self._as_dump 

861 ) 

862 

863 # Restrict to what is DESIRED 

864 if q: 

865 q = self._index_query_restricted_by_filter(q) 

866 if q and self.export_recipient: 

867 q = self._index_query_restricted_by_export_recipient(q) 

868 

869 return q 

870 

871 def _index_query_restricted_by_filter(self, q: Query) -> Optional[Query]: 

872 """ 

873 Counterpart to :func:`_task_query_restricted_by_filter`, but for 

874 indexes. 

875 

876 THIS IS A KEY SECURITY FUNCTION, since it implements some permissions 

877 that relate to viewing tasks when unfiltered. 

878 

879 Args: 

880 q: the starting SQLAlchemy ORM Query 

881 

882 Returns: 

883 the original query, a modified query, or ``None`` if no tasks 

884 would pass the filter 

885 

886 """ 

887 tf = self._filter # task filter 

888 user = self.req.user 

889 

890 if tf.group_ids: 

891 permitted_group_ids = tf.group_ids.copy() 

892 else: 

893 permitted_group_ids = None # unrestricted 

894 

895 if tf.dates_inconsistent(): 

896 return None 

897 

898 # Task type filtering 

899 

900 if tf.skip_anonymous_tasks(): 

901 # noinspection PyPep8 

902 q = q.filter(TaskIndexEntry.patient_pk != None) # noqa: E711 

903 

904 if not tf.offers_all_non_anonymous_task_types(): 

905 permitted_task_tablenames = [ 

906 tc.__tablename__ for tc in tf.task_classes 

907 ] 

908 q = q.filter( 

909 TaskIndexEntry.task_table_name.in_(permitted_task_tablenames) 

910 ) 

911 

912 # Special rules when we've not filtered for any patients 

913 

914 if not tf.any_specific_patient_filtering(): 

915 # No patient filtering. Permissions depend on user settings. 

916 if user.may_view_all_patients_when_unfiltered: 

917 # May see everything. No restrictions. 

918 pass 

919 elif user.may_view_no_patients_when_unfiltered: 

920 # Can't see patient data from any group. 

921 # (a) User not permitted to view any patients when 

922 # unfiltered, and (b) not filtered to a level that would 

923 # reasonably restrict to one or a small number of 

924 # patients. Restrict to anonymous tasks. 

925 # noinspection PyPep8 

926 q = q.filter(TaskIndexEntry.patient_pk == None) # noqa: E711 

927 else: 

928 # May see patient data from some, but not all, groups. 

929 # This is a little more complex than the equivalent in 

930 # _task_query_restricted_by_filter(), because we shouldn't 

931 # restrict anonymous tasks. 

932 liberal_group_ids = ( 

933 user.group_ids_nonsuperuser_may_see_when_unfiltered() 

934 ) 

935 # Anonymous is OK: 

936 # noinspection PyPep8 

937 liberal_or_anon_criteria = [ 

938 TaskIndexEntry.patient_pk == None # noqa: E711 

939 ] # type: List[ClauseElement] 

940 for gid in liberal_group_ids: 

941 liberal_or_anon_criteria.append( 

942 TaskIndexEntry.group_id == gid # this group OK 

943 ) 

944 q = q.filter(or_(*liberal_or_anon_criteria)) # type: ignore[arg-type] # noqa: E501 

945 

946 # Patient filtering 

947 

948 if tf.any_patient_filtering(): 

949 q = q.join(TaskIndexEntry.patient) # use relationship 

950 q = tf.filter_query_by_patient(q, via_index=True) 

951 

952 # Patient-independent filtering 

953 

954 if tf.device_ids: 

955 # noinspection PyProtectedMember 

956 q = q.filter(TaskIndexEntry.device_id.in_(tf.device_ids)) 

957 

958 if tf.era: 

959 # noinspection PyProtectedMember 

960 q = q.filter(TaskIndexEntry.era == tf.era) 

961 if tf.finalized_only: 

962 q = q.filter(TaskIndexEntry.era != ERA_NOW) 

963 

964 if tf.adding_user_ids: 

965 # noinspection PyProtectedMember 

966 q = q.filter(TaskIndexEntry.adding_user_id.in_(tf.adding_user_ids)) 

967 

968 if permitted_group_ids: 

969 # noinspection PyProtectedMember 

970 q = q.filter(TaskIndexEntry.group_id.in_(permitted_group_ids)) 

971 

972 if tf.start_datetime is not None: 

973 q = q.filter( 

974 TaskIndexEntry.when_created_utc >= tf.start_datetime_utc 

975 ) 

976 if tf.end_datetime is not None: 

977 q = q.filter(TaskIndexEntry.when_created_utc < tf.end_datetime_utc) 

978 

979 # text_contents is managed at the later fetch stage when using indexes 

980 

981 # But is_complete can be filtered now and in SQL: 

982 if tf.complete_only: 

983 # noinspection PyPep8 

984 q = q.filter(TaskIndexEntry.task_is_complete == True) # noqa: E712 

985 

986 # When we use indexes, we embed the global sort criteria in the query. 

987 if self._sort_method_global == TaskSortMethod.CREATION_DATE_ASC: 

988 q = q.order_by( 

989 TaskIndexEntry.when_created_utc.asc(), 

990 TaskIndexEntry.when_added_batch_utc.asc(), 

991 ) 

992 elif self._sort_method_global == TaskSortMethod.CREATION_DATE_DESC: 

993 q = q.order_by( 

994 TaskIndexEntry.when_created_utc.desc(), 

995 TaskIndexEntry.when_added_batch_utc.desc(), 

996 ) 

997 

998 return q 

999 

1000 def _index_query_restricted_by_export_recipient( 

1001 self, q: Query 

1002 ) -> Optional[Query]: 

1003 """ 

1004 For exports. 

1005 

1006 Filters via our 

1007 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`, 

1008 except for the bits already implemented via our 

1009 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`. 

1010 

1011 The main job here is for incremental exports: to find tasks that have 

1012 not yet been exported. 

1013 

1014 Compare :meth:`_task_query_restricted_by_export_recipient`. 

1015 

1016 Args: 

1017 q: the starting SQLAlchemy ORM Query 

1018 

1019 Returns: 

1020 the original query, a modified query, or ``None`` if no tasks 

1021 would pass the filter 

1022 

1023 """ 

1024 from camcops_server.cc_modules.cc_exportmodels import ( 

1025 ExportedTask, 

1026 ) # delayed import 

1027 

1028 r = self.export_recipient 

1029 if not r.is_incremental(): 

1030 # Full database export; no restrictions 

1031 return q 

1032 # Otherwise, restrict to tasks not yet sent to this recipient. 

1033 # Remember: q is a query on TaskIndexEntry. 

1034 # noinspection PyUnresolvedReferences 

1035 q = q.filter( 

1036 # "There is not a successful export record for this task/recipient" 

1037 ~exists() 

1038 .select_from( 

1039 ExportedTask.__table__.join( 

1040 ExportRecipient.__table__, 

1041 ExportedTask.recipient_id == ExportRecipient.id, 

1042 ) 

1043 ) 

1044 .where( 

1045 and_( 

1046 ExportRecipient.recipient_name == r.recipient_name, 

1047 ExportedTask.basetable == TaskIndexEntry.task_table_name, 

1048 # ... don't use ".tablename" as a property doesn't play 

1049 # nicely with SQLAlchemy here 

1050 ExportedTask.task_server_pk == TaskIndexEntry.task_pk, 

1051 ExportedTask.success == True, # noqa: E712 

1052 ExportedTask.cancelled == False, # noqa: E712 

1053 ) 

1054 ) 

1055 ) 

1056 return q 

1057 

1058 

1059# noinspection PyProtectedMember 

1060def encode_task_collection(coll: TaskCollection) -> Dict: 

1061 """ 

1062 Serializes a :class:`TaskCollection`. 

1063 

1064 The request is not serialized and must be rebuilt in another way; see e.g. 

1065 :func:`camcops_server.cc_modules.celery.email_basic_dump`. 

1066 """ 

1067 return { 

1068 "taskfilter": dumps(coll._filter, serializer="json"), 

1069 "as_dump": coll._as_dump, 

1070 "sort_method_by_class": dumps( 

1071 coll._sort_method_by_class, serializer="json" 

1072 ), 

1073 } 

1074 

1075 

1076# noinspection PyUnusedLocal 

1077def decode_task_collection(d: Dict, cls: Type) -> TaskCollection: 

1078 """ 

1079 Creates a :class:`TaskCollection` from a serialized version. 

1080 

1081 The request is not serialized and must be rebuilt in another way; see e.g. 

1082 :func:`camcops_server.cc_modules.celery.email_basic_dump`. 

1083 """ 

1084 kwargs = { 

1085 "taskfilter": loads(*reorder_args(*d["taskfilter"])), 

1086 "as_dump": d["as_dump"], 

1087 "sort_method_by_class": loads( 

1088 *reorder_args(*d["sort_method_by_class"]) 

1089 ), 

1090 } 

1091 return TaskCollection(req=None, **kwargs) 

1092 

1093 

1094def reorder_args( 

1095 content_type: str, content_encoding: str, data: str 

1096) -> List[str]: 

1097 """ 

1098 kombu :func:`SerializerRegistry.dumps` returns data as last element in 

1099 tuple but for :func:`SerializeRegistry.loads` it's the first argument 

1100 """ 

1101 return [data, content_type, content_encoding] 

1102 

1103 

1104register_class_for_json( 

1105 cls=TaskCollection, 

1106 obj_to_dict_fn=encode_task_collection, 

1107 dict_to_obj_fn=decode_task_collection, 

1108)