Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/cc_modules/cc_taskcollection.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27**Classes to fetch tasks from the database as efficiently as possible.** 

28 

29""" 

30 

31from collections import OrderedDict 

32import datetime 

33from enum import Enum 

34import logging 

35from threading import Thread 

36from typing import (Dict, Generator, List, Optional, Tuple, Type, 

37 TYPE_CHECKING, Union) 

38 

39from cardinal_pythonlib.json.serialize import ( 

40 register_class_for_json, 

41 register_enum_for_json, 

42) 

43from cardinal_pythonlib.logs import BraceStyleAdapter 

44from cardinal_pythonlib.reprfunc import auto_repr, auto_str 

45from cardinal_pythonlib.sort import MINTYPE_SINGLETON, MinType 

46from kombu.serialization import dumps, loads 

47from pendulum import DateTime as Pendulum 

48from sqlalchemy.orm import Query 

49from sqlalchemy.orm.session import Session as SqlASession 

50from sqlalchemy.sql.functions import func 

51from sqlalchemy.sql.expression import and_, exists, or_ 

52 

53from camcops_server.cc_modules.cc_constants import ERA_NOW 

54from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

55from camcops_server.cc_modules.cc_task import ( 

56 tablename_to_task_class_dict, 

57 Task, 

58) 

59from camcops_server.cc_modules.cc_taskfactory import ( 

60 task_query_restricted_to_permitted_users, 

61) 

62from camcops_server.cc_modules.cc_taskfilter import TaskFilter 

63from camcops_server.cc_modules.cc_taskindex import TaskIndexEntry 

64 

65if TYPE_CHECKING: 

66 from sqlalchemy.sql.elements import ClauseElement, ColumnElement 

67 from camcops_server.cc_modules.cc_request import CamcopsRequest 

68 

69log = BraceStyleAdapter(logging.getLogger(__name__)) 

70 

71 

72# ============================================================================= 

73# Debugging options 

74# ============================================================================= 

75 

76DEBUG_QUERY_TIMING = False 

77 

78if DEBUG_QUERY_TIMING: 

79 log.warning("Debugging options enabled!") 

80 

81 

82# ============================================================================= 

83# Sorting helpers 

84# ============================================================================= 

85 

86def task_when_created_sorter(task: Task) \ 

87 -> Union[Tuple[Pendulum, datetime.datetime], MinType]: 

88 """ 

89 Function to sort tasks by their creation date/time (with upload date/time 

90 as a tiebreak for consistent ordering). 

91 """ 

92 # For sorting of tasks 

93 created = task.when_created 

94 # noinspection PyProtectedMember 

95 uploaded = task._when_added_batch_utc 

96 return MINTYPE_SINGLETON if created is None else (created, uploaded) 

97 

98 

99@register_enum_for_json 

100class TaskSortMethod(Enum): 

101 """ 

102 Enum representing ways to sort tasks. 

103 """ 

104 NONE = 0 

105 CREATION_DATE_ASC = 1 

106 CREATION_DATE_DESC = 2 

107 

108 

109def sort_tasks_in_place(tasklist: List[Task], 

110 sortmethod: TaskSortMethod) -> None: 

111 """ 

112 Sort a list of tasks, in place, according to ``sortmethod``. 

113 

114 Args: 

115 tasklist: the list of tasks 

116 sortmethod: a :class:`TaskSortMethod` enum 

117 """ 

118 # Sort? 

119 if sortmethod == TaskSortMethod.CREATION_DATE_ASC: 

120 tasklist.sort(key=task_when_created_sorter) 

121 elif sortmethod == TaskSortMethod.CREATION_DATE_DESC: 

122 tasklist.sort(key=task_when_created_sorter, reverse=True) 

123 

124 

125# ============================================================================= 

126# Parallel fetch helper 

127# ============================================================================= 

128# - Why consider a parallel fetch? 

129# Because a typical fetch might involve 27ms per query (as seen by Python; 

130# less as seen by MySQL) but about 100 queries, for a not-very-large 

131# database. 

132# - Initially UNSUCCESSFUL: even after tweaking pool_size=0 in create_engine() 

133# to get round the SQLAlchemy error "QueuePool limit of size 5 overflow 10 

134# reached", in the parallel code, a great many queries are launched, but then 

135# something goes wrong and others are started but then block -- for ages -- 

136# waiting for a spare database connection, or something. 

137# - Fixed that: I was not explicitly closing the sessions. 

138# - But then a major conceptual problem: anything to be lazy-loaded (e.g. 

139# patient, but also patient ID, special note, BLOB...) will give this sort of 

140# error: "DetachedInstanceError: Parent instance <Phq9 at 0x7fe6cce2d278> is 

141# not bound to a Session; lazy load operation of attribute 'patient' cannot 

142# proceed" -- for obvious reasons. And some of those operations are only 

143# required on the final paginated task set, which requires aggregation across 

144# all tasks. 

145# 

146# HOWEVER, the query time per table drops from ~27ms to 4-8ms if we disable 

147# eager loading (lazy="joined") of patients from tasks. 

148 

149class FetchThread(Thread): 

150 """ 

151 Thread to fetch tasks in parallel. 

152 

153 CURRENTLY UNUSED. 

154 """ 

155 def __init__(self, 

156 req: "CamcopsRequest", 

157 task_class: Type[Task], 

158 factory: "TaskCollection", 

159 **kwargs) -> None: 

160 self.req = req 

161 self.task_class = task_class 

162 self.factory = factory 

163 self.error = False 

164 name = task_class.__tablename__ 

165 super().__init__(name=name, target=None, **kwargs) 

166 

167 def run(self) -> None: 

168 log.debug("Thread starting") 

169 dbsession = self.req.get_bare_dbsession() 

170 # noinspection PyBroadException 

171 try: 

172 # noinspection PyProtectedMember 

173 q = self.factory._make_query(dbsession, self.task_class) 

174 if q: 

175 tasks = q.all() # type: List[Task] 

176 # https://stackoverflow.com/questions/6319207/are-lists-thread-safe # noqa 

177 # https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary # noqa 

178 # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm # noqa 

179 # noinspection PyProtectedMember 

180 self.factory._tasks_by_class[self.task_class] = tasks 

181 log.debug("Thread finishing with results") 

182 else: 

183 log.debug("Thread finishing without results") 

184 except Exception: 

185 self.error = True 

186 log.error("Thread error") 

187 dbsession.close() 

188 

189 

190# ============================================================================= 

191# Make a set of tasks, deferring work until things are needed 

192# ============================================================================= 

193 

194class TaskCollection(object): 

195 """ 

196 Represent a potential or instantiated call to fetch tasks from the 

197 database. 

198 

199 The caller may want them in a giant list (e.g. task viewer, CTVs), or split 

200 by task class (e.g. trackers). 

201 """ 

202 def __init__(self, 

203 req: Optional["CamcopsRequest"], 

204 taskfilter: TaskFilter = None, 

205 as_dump: bool = False, 

206 sort_method_by_class: TaskSortMethod = TaskSortMethod.NONE, 

207 sort_method_global: TaskSortMethod = TaskSortMethod.NONE, 

208 current_only: bool = True, 

209 via_index: bool = True, 

210 export_recipient: "ExportRecipient" = None) \ 

211 -> None: 

212 """ 

213 Args: 

214 req: 

215 The 

216 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`. 

217 ``None`` should only be used as a parameter when serializing 

218 a :class:`TaskCollection` to the back-end. 

219 taskfilter: 

220 A :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter` 

221 object that contains any restrictions we may want to apply. 

222 Must be supplied unless supplying ``export_recipient`` (in 

223 which case, must not be supplied). 

224 as_dump: 

225 Use the "dump" permissions rather than the "view" permissions? 

226 sort_method_by_class: 

227 How should we sort tasks within each task class? 

228 sort_method_global: 

229 How should we sort tasks overall (across all task types)? 

230 current_only: 

231 Restrict to ``_current`` tasks only? 

232 via_index: 

233 Use the server's index (faster)? (Not possible with 

234 ``current_only=False``.) 

235 export_recipient: 

236 A :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` 

237 """ # noqa 

238 if via_index and not current_only: 

239 log.warning("Can't use index for non-current tasks") 

240 via_index = False 

241 

242 self._req = req 

243 self._filter = taskfilter 

244 self._as_dump = as_dump 

245 self._sort_method_by_class = sort_method_by_class 

246 self._sort_method_global = sort_method_global 

247 self._current_only = current_only 

248 self._via_index = via_index 

249 self.export_recipient = export_recipient 

250 

251 if export_recipient: 

252 # We create a new filter to reflect the export recipient. 

253 assert self._filter is None, ( 

254 "Can't supply taskfilter if you supply export_recipient") 

255 # We can do lots of what we need with a TaskFilter(). 

256 self._filter = TaskFilter() 

257 if not export_recipient.all_groups: 

258 self._filter.group_ids = export_recipient.group_ids 

259 self._filter.task_types = export_recipient.tasks 

260 self._filter.start_datetime = export_recipient.start_datetime_utc 

261 self._filter.end_datetime = export_recipient.end_datetime_utc 

262 self._filter.finalized_only = export_recipient.finalized_only 

263 self._filter.tasks_with_patient_only = not export_recipient.anonymous_ok() # noqa 

264 self._filter.must_have_idnum_type = export_recipient.primary_idnum 

265 else: 

266 assert self._filter, ( 

267 "Must supply taskfilter unless you supply export_recipient") 

268 

269 self._tasks_by_class = OrderedDict() # type: Dict[Type[Task], List[Task]] # noqa 

270 self._all_tasks = None # type: Optional[List[Task]] 

271 self._all_indexes = None # type: Optional[Union[List[TaskIndexEntry], Query]] # noqa 

272 

273 def __repr__(self) -> str: 

274 return auto_repr(self) 

275 

276 def __str__(self) -> str: 

277 return auto_str(self) 

278 

279 # ========================================================================= 

280 # Interface to read 

281 # ========================================================================= 

282 

283 @property 

284 def req(self) -> "CamcopsRequest": 

285 """ 

286 Returns the associated request, or raises :exc:`AssertionError` if it's 

287 not been set. 

288 """ 

289 assert self._req is not None, ( 

290 "Must initialize with a request or call set_request() first" 

291 ) 

292 return self._req 

293 

294 def set_request(self, req: "CamcopsRequest") -> None: 

295 """ 

296 Sets the request object manually. Used by Celery back-end tasks. 

297 

298 Args: 

299 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

300 """ 

301 self._req = req 

302 

303 def task_classes(self) -> List[Type[Task]]: 

304 """ 

305 Return a list of task classes that we want. 

306 """ 

307 return self._filter.task_classes 

308 

309 def tasks_for_task_class(self, task_class: Type[Task]) -> List[Task]: 

310 """ 

311 Returns all appropriate task instances for a specific task type. 

312 """ 

313 if self._via_index: 

314 self._ensure_everything_fetched_via_index() 

315 else: 

316 self._fetch_task_class(task_class) 

317 tasklist = self._tasks_by_class.get(task_class, []) 

318 return tasklist 

319 

320 @property 

321 def all_tasks(self) -> List[Task]: 

322 """ 

323 Returns a list of all appropriate task instances. 

324 """ 

325 if self._all_tasks is None: 

326 if self._via_index: 

327 self._ensure_everything_fetched_via_index() 

328 else: 

329 self._fetch_all_tasks_without_index() 

330 return self._all_tasks 

331 

332 @property 

333 def all_tasks_or_indexes_or_query(self) \ 

334 -> Union[List[Task], List[TaskIndexEntry], Query]: 

335 """ 

336 Returns a list of all appropriate task instances, or index entries, or 

337 a query returning them. 

338 

339 - Returning a list of tasks is fine, but the results of this function 

340 may be paginated (e.g. in the main task view), so the end result may 

341 be that e.g. 20,000 tasks are fetched and 20 are shown. 

342 - More efficient is to fetch 20,000 indexes from the single index 

343 table, and fetch only the 20 tasks we need. 

344 - More efficient still is to fetch the 20 indexes we need, and then 

345 their task. 

346 """ 

347 if not self._via_index: 

348 return self.all_tasks 

349 

350 self._build_index_query() # ensure self._all_indexes is set 

351 

352 if self._all_tasks is not None: 

353 # The tasks themselves have been fetched. 

354 return self._all_tasks 

355 

356 return self._all_indexes # indexes or a query to fetch them 

357 

358 # def forget_task_class(self, task_class: Type[Task]) -> None: 

359 # """ 

360 # Ditch results for a specific task class (for memory efficiency). 

361 # """ 

362 # self._tasks_by_class.pop(task_class, None) 

363 # # The "None" option prevents it from raising KeyError if the key 

364 # # doesn't exist. 

365 # # https://stackoverflow.com/questions/11277432/how-to-remove-a-key-from-a-python-dictionary # noqa 

366 

367 def gen_all_tasks_or_indexes(self) \ 

368 -> Generator[Union[Task, TaskIndexEntry], None, None]: 

369 """ 

370 Generates tasks or index entries. 

371 """ 

372 tasks_or_indexes_or_query = self.all_tasks_or_indexes_or_query 

373 if isinstance(tasks_or_indexes_or_query, Query): 

374 for item in tasks_or_indexes_or_query.all(): 

375 yield item 

376 else: 

377 for item in tasks_or_indexes_or_query: 

378 yield item 

379 

380 def gen_tasks_by_class(self) -> Generator[Task, None, None]: 

381 """ 

382 Generates all tasks, class-wise. 

383 """ 

384 for cls in self.task_classes(): 

385 for task in self.tasks_for_task_class(cls): 

386 yield task 

387 

388 def gen_tasks_in_global_order(self) -> Generator[Task, None, None]: 

389 """ 

390 Generates all tasks, in the global order. 

391 """ 

392 for task in self.all_tasks: 

393 yield task 

394 

395 @property 

396 def dbsession(self) -> SqlASession: 

397 """ 

398 Returns the request's database session. 

399 """ 

400 return self.req.dbsession 

401 

402 # ========================================================================= 

403 # Internals: fetching Task objects 

404 # ========================================================================= 

405 

406 def _fetch_all_tasks_without_index(self, parallel: bool = False) -> None: 

407 """ 

408 Fetch all tasks from the database. 

409 """ 

410 

411 # AVOID parallel=True; see notes above. 

412 if DEBUG_QUERY_TIMING: 

413 start_time = Pendulum.now() 

414 

415 if parallel: 

416 # Deprecated parallel fetch 

417 threads = [] # type: List[FetchThread] 

418 for task_class in self._filter.task_classes: 

419 thread = FetchThread(self.req, task_class, self) 

420 thread.start() 

421 threads.append(thread) 

422 for thread in threads: 

423 thread.join() 

424 if thread.error: 

425 raise ValueError("Multithreaded fetch failed") 

426 

427 else: 

428 # Fetch all tasks, classwise. 

429 for task_class in self._filter.task_classes: 

430 self._fetch_task_class(task_class) 

431 

432 if DEBUG_QUERY_TIMING: 

433 end_time = Pendulum.now() 

434 # noinspection PyUnboundLocalVariable 

435 time_taken = end_time - start_time 

436 log.info("_fetch_all_tasks took {}", time_taken) 

437 

438 # Build our joint task list 

439 self._all_tasks = [] # type: List[Task] 

440 for single_task_list in self._tasks_by_class.values(): 

441 self._all_tasks += single_task_list 

442 sort_tasks_in_place(self._all_tasks, self._sort_method_global) 

443 

444 def _fetch_task_class(self, task_class: Type[Task]) -> None: 

445 """ 

446 Fetch tasks from the database for one task type. 

447 """ 

448 if task_class in self._tasks_by_class: 

449 return # already fetched 

450 q = self._serial_query(task_class) 

451 if q is None: 

452 newtasks = [] # type: List[Task] 

453 else: 

454 newtasks = q.all() # type: List[Task] 

455 # Apply Python-side filters? 

456 newtasks = self._filter_through_python(newtasks) 

457 sort_tasks_in_place(newtasks, self._sort_method_by_class) 

458 self._tasks_by_class[task_class] = newtasks 

459 

460 def _serial_query(self, task_class: Type[Task]) -> Optional[Query]: 

461 """ 

462 Make and return an SQLAlchemy ORM query for a specific task class. 

463 

464 Returns ``None`` if no tasks would match our criteria. 

465 """ 

466 dbsession = self.req.dbsession 

467 return self._make_query(dbsession, task_class) 

468 

469 def _make_query(self, dbsession: SqlASession, 

470 task_class: Type[Task]) -> Optional[Query]: 

471 """ 

472 Make and return an SQLAlchemy ORM query for a specific task class. 

473 

474 Returns ``None`` if no tasks would match our criteria. 

475 """ 

476 q = dbsession.query(task_class) 

477 

478 # Restrict to what the web front end will supply 

479 # noinspection PyProtectedMember 

480 if self._current_only: 

481 # noinspection PyProtectedMember 

482 q = q.filter(task_class._current == True) # noqa: E712 

483 

484 # Restrict to what is PERMITTED 

485 q = task_query_restricted_to_permitted_users( 

486 self.req, q, task_class, as_dump=self._as_dump) 

487 

488 # Restrict to what is DESIRED 

489 if q: 

490 q = self._task_query_restricted_by_filter(q, task_class) 

491 if q and self.export_recipient: 

492 q = self._task_query_restricted_by_export_recipient(q, task_class) 

493 

494 return q 

495 

496 def _task_query_restricted_by_filter(self, 

497 q: Query, 

498 cls: Type[Task]) -> Optional[Query]: 

499 """ 

500 Restricts an SQLAlchemy ORM query for a given task class to those 

501 tasks that our filter permits. 

502 

503 THIS IS A KEY SECURITY FUNCTION, since it implements some permissions 

504 that relate to viewing tasks when unfiltered. 

505 

506 Args: 

507 q: the starting SQLAlchemy ORM Query 

508 cls: the task class 

509 

510 Returns: 

511 the original query, a modified query, or ``None`` if no tasks 

512 would pass the filter 

513 

514 """ 

515 tf = self._filter # task filter 

516 user = self.req.user 

517 

518 if tf.group_ids: 

519 permitted_group_ids = tf.group_ids.copy() 

520 else: 

521 permitted_group_ids = None # unrestricted 

522 

523 if tf.dates_inconsistent(): 

524 return None 

525 

526 if cls not in tf.task_classes: 

527 # We don't want this task 

528 return None 

529 

530 if not cls.is_anonymous: 

531 # Not anonymous. 

532 if not tf.any_specific_patient_filtering(): 

533 # No patient filtering. Permissions depend on user settings. 

534 if user.may_view_all_patients_when_unfiltered: 

535 # May see everything. No restrictions. 

536 pass 

537 elif user.may_view_no_patients_when_unfiltered: 

538 # Can't see patient data from any group. 

539 # (a) User not permitted to view any patients when 

540 # unfiltered, and (b) not filtered to a level that would 

541 # reasonably restrict to one or a small number of 

542 # patients. Skip the task class. 

543 return None 

544 else: 

545 # May see patient data from some, but not all, groups. 

546 liberal_group_ids = user.group_ids_that_nonsuperuser_may_see_when_unfiltered() # noqa 

547 if not permitted_group_ids: # was unrestricted 

548 permitted_group_ids = liberal_group_ids 

549 else: # was restricted; restrict further 

550 permitted_group_ids = [ 

551 gid for gid in permitted_group_ids 

552 if gid in liberal_group_ids 

553 ] 

554 if not permitted_group_ids: 

555 return None # down to zero; no point continuing 

556 

557 # Patient filtering 

558 if tf.any_patient_filtering(): 

559 # q = q.join(Patient) # fails 

560 q = q.join(cls.patient) # use explicitly configured relationship # noqa 

561 q = tf.filter_query_by_patient(q, via_index=False) 

562 

563 # Patient-independent filtering 

564 

565 if tf.device_ids: 

566 # noinspection PyProtectedMember 

567 q = q.filter(cls._device_id.in_(tf.device_ids)) 

568 

569 if tf.era: 

570 # noinspection PyProtectedMember 

571 q = q.filter(cls._era == tf.era) 

572 if tf.finalized_only: 

573 q = q.filter(cls._era != ERA_NOW) 

574 

575 if tf.adding_user_ids: 

576 # noinspection PyProtectedMember 

577 q = q.filter(cls._adding_user_id.in_(tf.adding_user_ids)) 

578 

579 if permitted_group_ids: 

580 # noinspection PyProtectedMember 

581 q = q.filter(cls._group_id.in_(permitted_group_ids)) 

582 

583 if tf.start_datetime is not None: 

584 q = q.filter(cls.when_created >= tf.start_datetime) 

585 if tf.end_datetime is not None: 

586 q = q.filter(cls.when_created < tf.end_datetime) 

587 

588 q = self._filter_query_for_text_contents(q, cls) 

589 

590 return q 

591 

592 def _task_query_restricted_by_export_recipient( 

593 self, q: Query, cls: Type[Task]) -> Optional[Query]: 

594 """ 

595 For exports. 

596 

597 Filters via our 

598 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`, 

599 except for the bits already implemented via our 

600 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`. 

601 

602 The main job here is for incremental exports: to find tasks that have 

603 not yet been exported. We look for any tasks not yet exported to a 

604 recipient of the same name (regardless of ``ExportRecipient.id``, which 

605 changes when the export recipient is reconfigured). 

606 

607 Compare :meth:`_index_query_restricted_by_export_recipient`. 

608 

609 Args: 

610 q: the starting SQLAlchemy ORM Query 

611 cls: the task class 

612 

613 Returns: 

614 the original query, a modified query, or ``None`` if no tasks 

615 would pass the filter 

616 """ 

617 from camcops_server.cc_modules.cc_exportmodels import ExportedTask # delayed import # noqa 

618 

619 r = self.export_recipient 

620 if not r.is_incremental(): 

621 # Full database export; no restrictions 

622 return q 

623 # Otherwise, restrict to tasks not yet sent to this recipient. 

624 # noinspection PyUnresolvedReferences 

625 q = q.filter( 

626 # "There is not a successful export record for this task/recipient" 

627 ~exists().select_from( 

628 ExportedTask.__table__.join( 

629 ExportRecipient.__table__, 

630 ExportedTask.recipient_id == ExportRecipient.id 

631 ) 

632 ).where( 

633 and_( 

634 ExportRecipient.recipient_name == r.recipient_name, 

635 ExportedTask.basetable == cls.__tablename__, 

636 ExportedTask.task_server_pk == cls._pk, 

637 ExportedTask.success == True, # noqa: E712 

638 ExportedTask.cancelled == False, # noqa: E712 

639 ) 

640 ) 

641 ) 

642 return q 

643 

644 def _filter_through_python(self, tasks: List[Task]) -> List[Task]: 

645 """ 

646 Returns those tasks in the list provided that pass any Python-only 

647 aspects of our filter (those parts not easily calculable via SQL). 

648 

649 This applies to the "direct" (and not "via index") routes only. With 

650 the index, we can do everything via SQL. 

651 """ 

652 assert not self._via_index 

653 if not self._has_python_parts_to_filter(): 

654 return tasks 

655 return [ 

656 t for t in tasks 

657 if self._task_matches_python_parts_of_filter(t) 

658 ] 

659 

660 def _has_python_parts_to_filter(self) -> bool: 

661 """ 

662 Does the filter have aspects to it that require some Python thought, 

663 not just a database query? 

664 

665 Only applicable to the direct (not "via index") route. 

666 """ 

667 assert not self._via_index 

668 return self._filter.complete_only 

669 

670 def _task_matches_python_parts_of_filter(self, task: Task) -> bool: 

671 """ 

672 Does the task pass the Python parts of the filter? 

673 

674 Only applicable to the direct (not "via index") route. 

675 """ 

676 assert not self._via_index 

677 

678 # "Is task complete" filter 

679 if self._filter.complete_only: 

680 if not task.is_complete(): 

681 return False 

682 

683 return True 

684 

685 # ========================================================================= 

686 # Shared between Task and TaskIndexEntry methods 

687 # ========================================================================= 

688 

689 def _filter_query_for_text_contents( 

690 self, q: Query, taskclass: Type[Task]) -> Optional[Query]: 

691 """ 

692 Returns the query, filtered for the "text contents" filter. 

693 

694 Args: 

695 q: the starting SQLAlchemy ORM Query 

696 taskclass: the task class 

697 

698 Returns: 

699 a Query, potentially modified. 

700 """ 

701 tf = self._filter # task filter 

702 

703 if not tf.text_contents: 

704 return q # unmodified 

705 

706 # task must contain ALL the strings in AT LEAST ONE text column 

707 textcols = taskclass.get_text_filter_columns() 

708 if not textcols: 

709 # Text filtering requested, but there are no text columns, so 

710 # by definition the filter must fail. 

711 return None 

712 clauses_over_text_phrases = [] # type: List[ColumnElement] 

713 # ... each e.g. "col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%'" # noqa 

714 for textfilter in tf.text_contents: 

715 tf_lower = textfilter.lower() 

716 clauses_over_columns = [] # type: List[ColumnElement] 

717 # ... each e.g. "col1 LIKE '%paracetamol%'" 

718 for textcol in textcols: 

719 # Case-insensitive comparison: 

720 # https://groups.google.com/forum/#!topic/sqlalchemy/331XoToT4lk 

721 # https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/StringComparisonFilter # noqa 

722 clauses_over_columns.append( 

723 func.lower(textcol).contains(tf_lower, autoescape=True) 

724 ) 

725 clauses_over_text_phrases.append( 

726 or_(*clauses_over_columns) 

727 ) 

728 return q.filter(and_(*clauses_over_text_phrases)) 

729 # ... thus, e.g. 

730 # "(col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%') AND 

731 # (col1 LIKE '%overdose%' OR col2 LIKE '%overdose%') 

732 

733 # ========================================================================= 

734 # Internals: fetching TaskIndexEntry objects 

735 # ========================================================================= 

736 

737 def _ensure_everything_fetched_via_index(self) -> None: 

738 """ 

739 Ensure we have all our tasks loaded, using the index. 

740 """ 

741 self._build_index_query() 

742 self._fetch_tasks_from_indexes() 

743 

744 def _build_index_query(self) -> None: 

745 """ 

746 Creates a Query in :attr:`_all_indexes` that will fetch task indexes. 

747 If the task filtering requires the tasks to be fetched (i.e. text 

748 contents), fetch the actual tasks too (and filter them). 

749 """ 

750 if self._all_indexes is not None: 

751 return 

752 self._all_indexes = self._make_index_query() 

753 if self._filter.text_contents: 

754 self._fetch_tasks_from_indexes() 

755 

756 def _fetch_tasks_from_indexes(self) -> None: 

757 """ 

758 Takes the query that has already been stored in :attr:`_all_indexes`, 

759 and populate the task attributes, :attr:`_all_tasks` and 

760 :attr:`_tasks_by_class`. 

761 """ 

762 if self._all_tasks is not None: 

763 return 

764 assert self._all_indexes is not None 

765 

766 d = tablename_to_task_class_dict() 

767 dbsession = self.req.dbsession 

768 self._all_tasks = [] # type: List[Task] 

769 

770 # Fetch indexes 

771 if isinstance(self._all_indexes, Query): 

772 # Query built, but indexes not yet fetched. 

773 # Replace the query with actual indexes 

774 self._all_indexes = self._all_indexes.all() # type: List[TaskIndexEntry] # noqa 

775 indexes = self._all_indexes 

776 

777 # Fetch tasks 

778 tablenames = set(index.task_table_name for index in indexes) 

779 for tablename in tablenames: 

780 # We do this by task class, so we can execute a single query per 

781 # task type (rather than per task). 

782 try: 

783 taskclass = d[tablename] 

784 except KeyError: 

785 log.warning("Bad tablename in index: {!r}", tablename) 

786 continue 

787 tasklist = self._tasks_by_class.setdefault(taskclass, []) 

788 task_pks = [i.task_pk for i in indexes if i.tablename == tablename] 

789 # noinspection PyProtectedMember 

790 qtask = ( 

791 dbsession.query(taskclass) 

792 .filter(taskclass._pk.in_(task_pks)) 

793 ) 

794 qtask = self._filter_query_for_text_contents(qtask, taskclass) 

795 tasks = qtask.all() # type: List[Task] 

796 for task in tasks: 

797 tasklist.append(task) 

798 self._all_tasks.append(task) 

799 

800 # Sort tasks 

801 for tasklist in self._tasks_by_class.values(): 

802 sort_tasks_in_place(tasklist, self._sort_method_by_class) 

803 sort_tasks_in_place(self._all_tasks, self._sort_method_global) 

804 

805 def _make_index_query(self) -> Optional[Query]: 

806 """ 

807 Make and return an SQLAlchemy ORM query to retrieve indexes. 

808 

809 Returns ``None`` if no tasks would match our criteria. 

810 """ 

811 dbsession = self.req.dbsession 

812 q = dbsession.query(TaskIndexEntry) 

813 

814 # Restrict to what the web front end will supply 

815 assert self._current_only, "_current_only must be true to use index" 

816 

817 # Restrict to what is PERMITTED 

818 if not self.export_recipient: 

819 q = task_query_restricted_to_permitted_users( 

820 self.req, q, TaskIndexEntry, as_dump=self._as_dump) 

821 

822 # Restrict to what is DESIRED 

823 if q: 

824 q = self._index_query_restricted_by_filter(q) 

825 if q and self.export_recipient: 

826 q = self._index_query_restricted_by_export_recipient(q) 

827 

828 return q 

829 

830 def _index_query_restricted_by_filter(self, q: Query) -> Optional[Query]: 

831 """ 

832 Counterpart to :func:`_task_query_restricted_by_filter`, but for 

833 indexes. 

834 

835 THIS IS A KEY SECURITY FUNCTION, since it implements some permissions 

836 that relate to viewing tasks when unfiltered. 

837 

838 Args: 

839 q: the starting SQLAlchemy ORM Query 

840 

841 Returns: 

842 the original query, a modified query, or ``None`` if no tasks 

843 would pass the filter 

844 

845 """ 

846 tf = self._filter # task filter 

847 user = self.req.user 

848 

849 if tf.group_ids: 

850 permitted_group_ids = tf.group_ids.copy() 

851 else: 

852 permitted_group_ids = None # unrestricted 

853 

854 if tf.dates_inconsistent(): 

855 return None 

856 

857 # Task type filtering 

858 

859 if tf.skip_anonymous_tasks(): 

860 # noinspection PyPep8 

861 q = q.filter(TaskIndexEntry.patient_pk != None) # noqa: E711 

862 

863 if not tf.offers_all_non_anonymous_task_types(): 

864 permitted_task_tablenames = [ 

865 tc.__tablename__ for tc in tf.task_classes] 

866 q = q.filter(TaskIndexEntry.task_table_name.in_( 

867 permitted_task_tablenames 

868 )) 

869 

870 # Special rules when we've not filtered for any patients 

871 

872 if not tf.any_specific_patient_filtering(): 

873 # No patient filtering. Permissions depend on user settings. 

874 if user.may_view_all_patients_when_unfiltered: 

875 # May see everything. No restrictions. 

876 pass 

877 elif user.may_view_no_patients_when_unfiltered: 

878 # Can't see patient data from any group. 

879 # (a) User not permitted to view any patients when 

880 # unfiltered, and (b) not filtered to a level that would 

881 # reasonably restrict to one or a small number of 

882 # patients. Restrict to anonymous tasks. 

883 # noinspection PyPep8 

884 q = q.filter(TaskIndexEntry.patient_pk == None) # noqa: E711 

885 else: 

886 # May see patient data from some, but not all, groups. 

887 # This is a little more complex than the equivalent in 

888 # _task_query_restricted_by_filter(), because we shouldn't 

889 # restrict anonymous tasks. 

890 liberal_group_ids = user.group_ids_that_nonsuperuser_may_see_when_unfiltered() # noqa 

891 # noinspection PyPep8 

892 liberal_or_anon_criteria = [ 

893 TaskIndexEntry.patient_pk == None # anonymous OK # noqa: E501,E711 

894 ] # type: List[ClauseElement] 

895 for gid in liberal_group_ids: 

896 liberal_or_anon_criteria.append( 

897 TaskIndexEntry.group_id == gid # this group OK 

898 ) 

899 q = q.filter(or_(*liberal_or_anon_criteria)) 

900 

901 # Patient filtering 

902 

903 if tf.any_patient_filtering(): 

904 q = q.join(TaskIndexEntry.patient) # use relationship 

905 q = tf.filter_query_by_patient(q, via_index=True) 

906 

907 # Patient-independent filtering 

908 

909 if tf.device_ids: 

910 # noinspection PyProtectedMember 

911 q = q.filter(TaskIndexEntry.device_id.in_(tf.device_ids)) 

912 

913 if tf.era: 

914 # noinspection PyProtectedMember 

915 q = q.filter(TaskIndexEntry.era == tf.era) 

916 if tf.finalized_only: 

917 q = q.filter(TaskIndexEntry.era != ERA_NOW) 

918 

919 if tf.adding_user_ids: 

920 # noinspection PyProtectedMember 

921 q = q.filter(TaskIndexEntry.adding_user_id.in_(tf.adding_user_ids)) 

922 

923 if permitted_group_ids: 

924 # noinspection PyProtectedMember 

925 q = q.filter(TaskIndexEntry.group_id.in_(permitted_group_ids)) 

926 

927 if tf.start_datetime is not None: 

928 q = q.filter(TaskIndexEntry.when_created_utc >= tf.start_datetime_utc) # noqa 

929 if tf.end_datetime is not None: 

930 q = q.filter(TaskIndexEntry.when_created_utc < tf.end_datetime_utc) # noqa 

931 

932 # text_contents is managed at the later fetch stage when using indexes 

933 

934 # But is_complete can be filtered now and in SQL: 

935 if tf.complete_only: 

936 # noinspection PyPep8 

937 q = q.filter(TaskIndexEntry.task_is_complete == True) # noqa: E712 

938 

939 # When we use indexes, we embed the global sort criteria in the query. 

940 if self._sort_method_global == TaskSortMethod.CREATION_DATE_ASC: 

941 q = q.order_by(TaskIndexEntry.when_created_utc.asc(), 

942 TaskIndexEntry.when_added_batch_utc.asc()) 

943 elif self._sort_method_global == TaskSortMethod.CREATION_DATE_DESC: 

944 q = q.order_by(TaskIndexEntry.when_created_utc.desc(), 

945 TaskIndexEntry.when_added_batch_utc.desc()) 

946 

947 return q 

948 

949 def _index_query_restricted_by_export_recipient(self, q: Query) \ 

950 -> Optional[Query]: 

951 """ 

952 For exports. 

953 

954 Filters via our 

955 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`, 

956 except for the bits already implemented via our 

957 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`. 

958 

959 The main job here is for incremental exports: to find tasks that have 

960 not yet been exported. 

961 

962 Compare :meth:`_task_query_restricted_by_export_recipient`. 

963 

964 Args: 

965 q: the starting SQLAlchemy ORM Query 

966 

967 Returns: 

968 the original query, a modified query, or ``None`` if no tasks 

969 would pass the filter 

970 

971 """ 

972 from camcops_server.cc_modules.cc_exportmodels import ExportedTask # delayed import # noqa 

973 

974 r = self.export_recipient 

975 if not r.is_incremental(): 

976 # Full database export; no restrictions 

977 return q 

978 # Otherwise, restrict to tasks not yet sent to this recipient. 

979 # Remember: q is a query on TaskIndexEntry. 

980 # noinspection PyUnresolvedReferences 

981 q = q.filter( 

982 # "There is not a successful export record for this task/recipient" 

983 ~exists().select_from( 

984 ExportedTask.__table__.join( 

985 ExportRecipient.__table__, 

986 ExportedTask.recipient_id == ExportRecipient.id 

987 ) 

988 ).where( 

989 and_( 

990 ExportRecipient.recipient_name == r.recipient_name, 

991 ExportedTask.basetable == TaskIndexEntry.task_table_name, 

992 # ... don't use ".tablename" as a property doesn't play 

993 # nicely with SQLAlchemy here 

994 ExportedTask.task_server_pk == TaskIndexEntry.task_pk, 

995 ExportedTask.success == True, # noqa: E712 

996 ExportedTask.cancelled == False, # noqa: E712 

997 ) 

998 ) 

999 ) 

1000 return q 

1001 

1002 

1003# noinspection PyProtectedMember 

1004def encode_task_collection(coll: TaskCollection) -> Dict: 

1005 """ 

1006 Serializes a :class:`TaskCollection`. 

1007 

1008 The request is not serialized and must be rebuilt in another way; see e.g. 

1009 :func:`camcops_server.cc_modules.celery.email_basic_dump`. 

1010 """ 

1011 return { 

1012 "taskfilter": dumps(coll._filter, serializer="json"), 

1013 "as_dump": coll._as_dump, 

1014 "sort_method_by_class": dumps(coll._sort_method_by_class, 

1015 serializer="json"), 

1016 } 

1017 

1018 

1019# noinspection PyUnusedLocal 

1020def decode_task_collection(d: Dict, cls: Type) -> TaskCollection: 

1021 """ 

1022 Creates a :class:`TaskCollection` from a serialized version. 

1023 

1024 The request is not serialized and must be rebuilt in another way; see e.g. 

1025 :func:`camcops_server.cc_modules.celery.email_basic_dump`. 

1026 """ 

1027 kwargs = { 

1028 "taskfilter": loads(*reorder_args(*d["taskfilter"])), 

1029 "as_dump": d["as_dump"], 

1030 "sort_method_by_class": loads( 

1031 *reorder_args(*d["sort_method_by_class"])), 

1032 } 

1033 return TaskCollection(req=None, **kwargs) 

1034 

1035 

1036def reorder_args(content_type: str, 

1037 content_encoding: str, 

1038 data: str) -> List[str]: 

1039 """ 

1040 kombu :func:`SerializerRegistry.dumps` returns data as last element in 

1041 tuple but for :func:`SerializeRegistry.loads` it's the first argument 

1042 """ 

1043 return [data, content_type, content_encoding] 

1044 

1045 

1046register_class_for_json( 

1047 cls=TaskCollection, 

1048 obj_to_dict_fn=encode_task_collection, 

1049 dict_to_obj_fn=decode_task_collection 

1050)