Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/cc_modules/cc_taskindex.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27**Server-side task index.** 

28 

29Note in particular that if you, as a developer, change the ``is_complete()`` 

30criteria for a task, you should cause the server index to be rebuilt (because 

31it caches ``is_complete()`` information). 

32 

33""" 

34 

35import logging 

36from typing import List, Optional, Type, TYPE_CHECKING 

37 

38from cardinal_pythonlib.logs import BraceStyleAdapter 

39from cardinal_pythonlib.reprfunc import simple_repr 

40from cardinal_pythonlib.sqlalchemy.session import get_engine_from_session 

41from cardinal_pythonlib.sqlalchemy.schema import table_exists 

42from cardinal_pythonlib.sqlalchemy.sqlserver import ( 

43 if_sqlserver_disable_constraints_triggers, 

44) 

45from pendulum import DateTime as Pendulum 

46import pyramid.httpexceptions as exc 

47from sqlalchemy.orm import relationship, Session as SqlASession 

48from sqlalchemy.sql.expression import and_, exists, join, literal, select 

49from sqlalchemy.sql.schema import Column, ForeignKey, Table 

50from sqlalchemy.sql.sqltypes import BigInteger, Boolean, DateTime, Integer 

51 

52from camcops_server.cc_modules.cc_client_api_core import ( 

53 BatchDetails, 

54 fail_user_error, 

55 UploadTableChanges, 

56) 

57from camcops_server.cc_modules.cc_constants import ERA_NOW 

58from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition 

59from camcops_server.cc_modules.cc_patient import Patient 

60from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

61from camcops_server.cc_modules.cc_sqla_coltypes import ( 

62 EraColType, 

63 isotzdatetime_to_utcdatetime, 

64 PendulumDateTimeAsIsoTextColType, 

65 TableNameColType, 

66) 

67from camcops_server.cc_modules.cc_sqlalchemy import Base 

68from camcops_server.cc_modules.cc_task import ( 

69 all_task_tablenames, 

70 tablename_to_task_class_dict, 

71 Task, 

72) 

73from camcops_server.cc_modules.cc_user import User 

74 

75if TYPE_CHECKING: 

76 from camcops_server.cc_modules.cc_request import CamcopsRequest 

77 

78log = BraceStyleAdapter(logging.getLogger(__name__)) 

79 

80 

81# ============================================================================= 

82# Helper functions 

83# ============================================================================= 

84 

85def task_factory_unfiltered(dbsession: SqlASession, 

86 basetable: str, 

87 serverpk: int) -> Optional[Task]: 

88 """ 

89 Load a task from the database and return it. 

90 No permission filtering is performed. (Used by 

91 :class:`camcops_server.cc_modules.cc_taskindex.TaskIndexEntry`.) 

92 

93 Args: 

94 dbsession: a :class:`sqlalchemy.orm.session.Session` 

95 basetable: name of the task's base table 

96 serverpk: server PK of the task 

97 

98 Returns: 

99 the task, or ``None`` if the PK doesn't exist 

100 

101 Raises: 

102 :exc:`HTTPBadRequest` if the table doesn't exist 

103 """ 

104 d = tablename_to_task_class_dict() 

105 try: 

106 cls = d[basetable] # may raise KeyError 

107 except KeyError: 

108 raise exc.HTTPBadRequest(f"No such task table: {basetable!r}") 

109 # noinspection PyProtectedMember 

110 q = dbsession.query(cls).filter(cls._pk == serverpk) 

111 return q.first() 

112 

113 

114# ============================================================================= 

115# PatientIdNumIndexEntry 

116# ============================================================================= 

117 

118class PatientIdNumIndexEntry(Base): 

119 """ 

120 Represents a server index entry for a 

121 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`. 

122 

123 - Only current ID numbers are indexed. 

124 """ 

125 __tablename__ = "_idnum_index" 

126 

127 idnum_pk = Column( 

128 "idnum_pk", Integer, 

129 primary_key=True, index=True, 

130 comment="Server primary key of the PatientIdNum " 

131 "(and of the PatientIdNumIndexEntry)" 

132 ) 

133 indexed_at_utc = Column( 

134 "indexed_at_utc", DateTime, nullable=False, 

135 comment="When this index entry was created" 

136 ) 

137 

138 # noinspection PyProtectedMember 

139 patient_pk = Column( 

140 "patient_pk", Integer, ForeignKey(Patient._pk), 

141 index=True, 

142 comment="Server primary key of the Patient" 

143 ) 

144 which_idnum = Column( 

145 "which_idnum", Integer, ForeignKey(IdNumDefinition.which_idnum), 

146 nullable=False, 

147 index=True, 

148 comment="Which of the server's ID numbers is this?" 

149 ) 

150 idnum_value = Column( 

151 "idnum_value", BigInteger, 

152 comment="The value of the ID number" 

153 ) 

154 

155 # Relationships: 

156 patient = relationship(Patient) 

157 

158 def __repr__(self) -> str: 

159 return simple_repr(self, ["idnum_pk", "patient_pk", 

160 "which_idnum", "idnum_value"]) 

161 

162 # ------------------------------------------------------------------------- 

163 # Create 

164 # ------------------------------------------------------------------------- 

165 

166 @classmethod 

167 def make_from_idnum(cls, idnum: PatientIdNum) -> "PatientIdNumIndexEntry": 

168 """ 

169 Returns an ID index entry for the specified 

170 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`. The 

171 returned index requires inserting into a database session. 

172 """ 

173 # noinspection PyProtectedMember 

174 assert idnum._current, "Only index current PatientIdNum objects" 

175 index = cls() 

176 index.idnum_pk = idnum.pk 

177 index.patient_pk = idnum.get_patient_server_pk() 

178 index.which_idnum = idnum.which_idnum 

179 index.idnum_value = idnum.idnum_value 

180 index.indexed_at_utc = Pendulum.now() 

181 return index 

182 

183 @classmethod 

184 def index_idnum(cls, idnum: PatientIdNum, session: SqlASession) -> None: 

185 """ 

186 Indexes an ID number and inserts the index into the database. 

187 

188 Args: 

189 idnum: a 

190 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum` 

191 session: 

192 an SQLAlchemy Session 

193 """ 

194 index = cls.make_from_idnum(idnum) 

195 session.add(index) 

196 

197 @classmethod 

198 def unindex_patient(cls, patient: Patient, 

199 session: SqlASession) -> None: 

200 """ 

201 Removes all ID number indexes from the database for a patient. 

202 

203 Args: 

204 patient: 

205 :class:`camcops_server.cc_modules.cc_patient.Patient` 

206 session: 

207 an SQLAlchemy Session 

208 """ 

209 

210 # noinspection PyUnresolvedReferences 

211 idxtable = cls.__table__ # type: Table 

212 idxcols = idxtable.columns 

213 # noinspection PyProtectedMember 

214 session.execute( 

215 idxtable.delete() 

216 .where(idxcols.patient_pk == patient._pk) 

217 ) 

218 

219 # ------------------------------------------------------------------------- 

220 # Regenerate index 

221 # ------------------------------------------------------------------------- 

222 

223 @classmethod 

224 def rebuild_idnum_index(cls, session: SqlASession, 

225 indexed_at_utc: Pendulum) -> None: 

226 """ 

227 Rebuilds the index entirely. Uses SQLAlchemy Core (not ORM) for speed. 

228 

229 Args: 

230 session: an SQLAlchemy Session 

231 indexed_at_utc: current time in UTC 

232 """ 

233 log.info("Rebuilding patient ID number index") 

234 # noinspection PyUnresolvedReferences 

235 indextable = PatientIdNumIndexEntry.__table__ # type: Table 

236 indexcols = indextable.columns 

237 # noinspection PyUnresolvedReferences 

238 idnumtable = PatientIdNum.__table__ # type: Table 

239 idnumcols = idnumtable.columns 

240 # noinspection PyUnresolvedReferences 

241 patienttable = Patient.__table__ # type: Table 

242 patientcols = patienttable.columns 

243 

244 # Delete all entries 

245 with if_sqlserver_disable_constraints_triggers(session, 

246 indextable.name): 

247 session.execute( 

248 indextable.delete() 

249 ) 

250 

251 # Create new ones 

252 # noinspection PyProtectedMember,PyPep8 

253 session.execute( 

254 indextable.insert().from_select( 

255 # Target: 

256 [indexcols.idnum_pk, 

257 indexcols.indexed_at_utc, 

258 indexcols.patient_pk, 

259 indexcols.which_idnum, 

260 indexcols.idnum_value], 

261 # Source: 

262 ( 

263 select([idnumcols._pk, 

264 literal(indexed_at_utc), 

265 patientcols._pk, 

266 idnumcols.which_idnum, 

267 idnumcols.idnum_value]) 

268 .select_from( 

269 join( 

270 idnumtable, 

271 patienttable, 

272 and_( 

273 idnumcols._device_id == patientcols._device_id, 

274 idnumcols._era == patientcols._era, 

275 idnumcols.patient_id == patientcols.id, 

276 ) 

277 ) 

278 ) 

279 .where(idnumcols._current == True) # noqa: E712 

280 .where(idnumcols.idnum_value.isnot(None)) 

281 .where(patientcols._current == True) # noqa: E712 

282 ) 

283 ) 

284 ) 

285 

286 # ------------------------------------------------------------------------- 

287 # Check index 

288 # ------------------------------------------------------------------------- 

289 @classmethod 

290 def check_index(cls, session: SqlASession, 

291 show_all_bad: bool = False) -> bool: 

292 """ 

293 Checks the index. 

294 

295 Args: 

296 session: 

297 an SQLAlchemy Session 

298 show_all_bad: 

299 show all bad entries? (If false, return upon the first) 

300 

301 Returns: 

302 bool: is the index OK? 

303 """ 

304 ok = True 

305 

306 log.info( 

307 "Checking all patient ID number indexes represent valid entries") 

308 # noinspection PyUnresolvedReferences,PyProtectedMember 

309 q_idx_without_original = session.query(PatientIdNumIndexEntry).filter( 

310 ~exists() 

311 .select_from( 

312 PatientIdNum.__table__.join( 

313 Patient.__table__, 

314 Patient.id == PatientIdNum.patient_id, 

315 Patient._device_id == PatientIdNum._device_id, 

316 Patient._era == PatientIdNum._era, 

317 ) 

318 ).where(and_( 

319 PatientIdNum._pk == PatientIdNumIndexEntry.idnum_pk, 

320 PatientIdNum._current == True, # noqa: E712 

321 PatientIdNum.which_idnum == PatientIdNumIndexEntry.which_idnum, 

322 PatientIdNum.idnum_value == PatientIdNumIndexEntry.idnum_value, 

323 Patient._pk == PatientIdNumIndexEntry.patient_pk, 

324 Patient._current == True, # noqa: E712 

325 )) 

326 ) 

327 for index in q_idx_without_original: 

328 log.error("Patient ID number index without matching " 

329 "original: {!r}", index) 

330 ok = False 

331 if not show_all_bad: 

332 return ok 

333 

334 log.info("Checking all patient ID numbers have an index") 

335 # noinspection PyUnresolvedReferences,PyProtectedMember 

336 q_original_with_idx = session.query(PatientIdNum).filter( 

337 PatientIdNum._current == True, # noqa: E712 

338 PatientIdNum.idnum_value.isnot(None), 

339 ~exists() 

340 .select_from( 

341 PatientIdNumIndexEntry.__table__ 

342 ).where(and_( 

343 PatientIdNum._pk == PatientIdNumIndexEntry.idnum_pk, 

344 PatientIdNum.which_idnum == PatientIdNumIndexEntry.which_idnum, # noqa 

345 PatientIdNum.idnum_value == PatientIdNumIndexEntry.idnum_value, # noqa 

346 )) 

347 ) 

348 for orig in q_original_with_idx: 

349 log.error("ID number without index entry: {!r}", orig) 

350 ok = False 

351 if not show_all_bad: 

352 return ok 

353 

354 return ok 

355 

356 # ------------------------------------------------------------------------- 

357 # Update index at the point of upload from a device 

358 # ------------------------------------------------------------------------- 

359 

360 @classmethod 

361 def update_idnum_index_for_upload( 

362 cls, 

363 session: SqlASession, 

364 indexed_at_utc: Pendulum, 

365 tablechanges: UploadTableChanges) -> None: 

366 """ 

367 Updates the index for a device's upload. 

368 

369 - Deletes index entries for records that are on the way out. 

370 - Creates index entries for records that are on the way in. 

371 - Should be called after both the Patient and PatientIdNum tables are 

372 committed; see special ordering in 

373 :func:`camcops_server.cc_modules.client_api.commit_all`. 

374 

375 Args: 

376 session: 

377 an SQLAlchemy Session 

378 indexed_at_utc: 

379 current time in UTC 

380 tablechanges: 

381 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges` 

382 object describing the changes to a table 

383 """ # noqa 

384 # noinspection PyUnresolvedReferences 

385 indextable = PatientIdNumIndexEntry.__table__ # type: Table 

386 indexcols = indextable.columns 

387 # noinspection PyUnresolvedReferences 

388 idnumtable = PatientIdNum.__table__ # type: Table 

389 idnumcols = idnumtable.columns 

390 # noinspection PyUnresolvedReferences 

391 patienttable = Patient.__table__ # type: Table 

392 patientcols = patienttable.columns 

393 

394 # Delete the old 

395 removal_pks = tablechanges.idnum_delete_index_pks 

396 if removal_pks: 

397 log.debug("Deleting old ID number indexes: server PKs {}", 

398 removal_pks) 

399 session.execute( 

400 indextable.delete() 

401 .where(indextable.c.idnum_pk.in_(removal_pks)) 

402 ) 

403 

404 # Create the new 

405 addition_pks = tablechanges.idnum_add_index_pks 

406 if addition_pks: 

407 log.debug("Adding ID number indexes: server PKs {}", addition_pks) 

408 # noinspection PyPep8,PyProtectedMember 

409 session.execute( 

410 indextable.insert().from_select( 

411 # Target: 

412 [indexcols.idnum_pk, 

413 indexcols.indexed_at_utc, 

414 indexcols.patient_pk, 

415 indexcols.which_idnum, 

416 indexcols.idnum_value], 

417 # Source: 

418 ( 

419 select([idnumcols._pk, 

420 literal(indexed_at_utc), 

421 patientcols._pk, 

422 idnumcols.which_idnum, 

423 idnumcols.idnum_value]) 

424 .select_from( 

425 join( 

426 idnumtable, 

427 patienttable, 

428 and_( 

429 idnumcols._device_id == patientcols._device_id, # noqa 

430 idnumcols._era == patientcols._era, 

431 idnumcols.patient_id == patientcols.id, 

432 ) 

433 ) 

434 ) 

435 .where(idnumcols._pk.in_(addition_pks)) 

436 .where(patientcols._current == True) # noqa: E712 

437 ) 

438 ) 

439 ) 

440 

441 

442# ============================================================================= 

443# TaskIndexEntry 

444# ============================================================================= 

445 

446class TaskIndexEntry(Base): 

447 """ 

448 Represents a server index entry for a 

449 :class:`camcops_server.cc_modules.cc_task.Task`. 

450 

451 - Only current tasks are indexed. This simplifies direct linking to patient 

452 PKs. 

453 """ 

454 __tablename__ = "_task_index" 

455 

456 index_entry_pk = Column( 

457 "index_entry_pk", Integer, 

458 primary_key=True, autoincrement=True, 

459 comment="Arbitrary primary key of this index entry" 

460 ) 

461 indexed_at_utc = Column( 

462 "indexed_at_utc", DateTime, nullable=False, 

463 comment="When this index entry was created" 

464 ) 

465 

466 # The next two fields link to our task: 

467 task_table_name = Column( 

468 "task_table_name", TableNameColType, 

469 index=True, 

470 comment="Table name of the task's base table" 

471 ) 

472 task_pk = Column( 

473 "task_pk", Integer, 

474 index=True, 

475 comment="Server primary key of the task" 

476 ) 

477 # We can probably even represent this with an SQLAlchemy ORM relationship. 

478 # This is polymorphic loading (we'll return objects of different types) 

479 # based on concrete table inheritance (each type of object -- each task -- 

480 # has its own standalone table). 

481 # However, there are warnings about the inefficiency of this; see 

482 # https://docs.sqlalchemy.org/en/latest/orm/inheritance.html#concrete-table-inheritance 

483 # and we are trying to be efficient. So let's do via task() below. 

484 

485 # This links to the task's patient, if there is one: 

486 # noinspection PyProtectedMember 

487 patient_pk = Column( 

488 "patient_pk", Integer, ForeignKey(Patient._pk), 

489 index=True, 

490 comment="Server primary key of the patient (if applicable)" 

491 ) 

492 

493 # These fields allow us to filter tasks efficiently: 

494 device_id = Column( 

495 "device_id", Integer, ForeignKey("_security_devices.id"), 

496 nullable=False, 

497 index=True, 

498 comment="ID of the source tablet device" 

499 ) 

500 era = Column( 

501 "era", EraColType, nullable=False, 

502 index=True, 

503 comment="Era (_era) field of the source record", 

504 ) 

505 when_created_utc = Column( 

506 "when_created_utc", DateTime, nullable=False, 

507 index=True, 

508 comment="Date/time this task instance was created (UTC)" 

509 ) 

510 when_created_iso = Column( 

511 "when_created_iso", PendulumDateTimeAsIsoTextColType, nullable=False, 

512 index=True, 

513 comment="Date/time this task instance was created (ISO 8601)" 

514 ) # Pendulum on the Python side 

515 when_added_batch_utc = Column( 

516 "when_added_batch_utc", DateTime, nullable=False, 

517 index=True, 

518 comment="Date/time this task index was uploaded (UTC)" 

519 ) 

520 adding_user_id = Column( 

521 "adding_user_id", Integer, ForeignKey("_security_users.id"), 

522 comment="ID of user that added this task", 

523 ) 

524 group_id = Column( 

525 "group_id", Integer, ForeignKey("_security_groups.id"), 

526 nullable=False, index=True, 

527 comment="ID of group to which this task belongs" 

528 ) 

529 task_is_complete = Column( 

530 "task_is_complete", Boolean, nullable=False, 

531 comment="Is the task complete (as judged by the server when the index " 

532 "entry was created)?" 

533 ) 

534 

535 # Relationships: 

536 patient = relationship(Patient) 

537 _adding_user = relationship(User) 

538 

539 def __repr__(self) -> str: 

540 return simple_repr(self, [ 

541 "index_entry_pk", "task_table_name", "task_pk", "patient_pk", 

542 "device_id", "era", "when_created_utc", "when_created_iso", 

543 "when_added_batch_utc", 

544 "adding_user_id", "group_id", "task_is_complete", 

545 ]) 

546 

547 # ------------------------------------------------------------------------- 

548 # Fetch the task 

549 # ------------------------------------------------------------------------- 

550 

551 @property 

552 def task(self) -> Optional[Task]: 

553 """ 

554 Returns: 

555 the associated :class:`camcops_server.cc_modules.cc_task.Task`, or 

556 ``None`` if none exists. 

557 

558 Raises: 

559 :exc:`HTTPBadRequest` if the table doesn't exist 

560 """ 

561 dbsession = SqlASession.object_session(self) 

562 assert dbsession, ( 

563 "TaskIndexEntry.task called on a TaskIndexEntry " 

564 "that's not yet in a database session") 

565 return task_factory_unfiltered( 

566 dbsession, self.task_table_name, self.task_pk) 

567 

568 # ------------------------------------------------------------------------- 

569 # Other properties mirroring those of Task, for duck typing 

570 # ------------------------------------------------------------------------- 

571 

572 @property 

573 def is_anonymous(self) -> bool: 

574 """ 

575 Is the task anonymous? 

576 """ 

577 return self.patient_pk is None 

578 

579 def is_complete(self) -> bool: 

580 """ 

581 Is the task complete? 

582 """ 

583 return self.task_is_complete 

584 

585 @property 

586 def _current(self) -> bool: 

587 """ 

588 All task index entries represent complete tasks, so this always returns 

589 ``True``. 

590 """ 

591 return True 

592 

593 @property 

594 def pk(self) -> int: 

595 """ 

596 Return's the task's server PK. 

597 """ 

598 return self.task_pk 

599 

600 @property 

601 def tablename(self) -> str: 

602 """ 

603 Returns the base table name of the task. 

604 """ 

605 return self.task_table_name 

606 

607 @property 

608 def shortname(self) -> str: 

609 """ 

610 Returns the task's shortname. 

611 """ 

612 d = tablename_to_task_class_dict() 

613 taskclass = d[self.task_table_name] 

614 return taskclass.shortname 

615 

616 def is_live_on_tablet(self) -> bool: 

617 """ 

618 Is the task live on the source device (e.g. tablet)? 

619 """ 

620 return self.era == ERA_NOW 

621 

622 @property 

623 def when_created(self) -> Pendulum: 

624 """ 

625 Returns the creation date/time as a Pendulum DateTime object. 

626 """ 

627 return self.when_created_iso 

628 

629 def any_patient_idnums_invalid(self, req: "CamcopsRequest") -> bool: 

630 """ 

631 Do we have a patient who has any invalid ID numbers? 

632 

633 Args: 

634 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

635 """ 

636 idnums = self.get_patient_idnum_objects() 

637 for idnum in idnums: 

638 if not idnum.is_fully_valid(req): 

639 return True 

640 return False 

641 

642 def get_patient_idnum_objects(self) -> List[PatientIdNum]: 

643 """ 

644 Gets all :class:`PatientIdNum` objects for the patient. 

645 """ 

646 return self.patient.get_idnum_objects() if self.patient else [] 

647 

648 # ------------------------------------------------------------------------- 

649 # Create 

650 # ------------------------------------------------------------------------- 

651 

652 @classmethod 

653 def make_from_task(cls, task: Task, 

654 indexed_at_utc: Pendulum) -> "TaskIndexEntry": 

655 """ 

656 Returns a task index entry for the specified 

657 :class:`camcops_server.cc_modules.cc_task.Task`. The 

658 returned index requires inserting into a database session. 

659 

660 Args: 

661 task: 

662 a :class:`camcops_server.cc_modules.cc_task.Task` 

663 indexed_at_utc: 

664 current time in UTC 

665 """ 

666 assert indexed_at_utc is not None, "Missing indexed_at_utc" 

667 

668 index = cls() 

669 

670 index.indexed_at_utc = indexed_at_utc 

671 

672 index.task_table_name = task.tablename 

673 index.task_pk = task.pk 

674 

675 patient = task.patient 

676 index.patient_pk = patient.pk if patient else None 

677 

678 index.device_id = task.device_id 

679 index.era = task.era 

680 index.when_created_utc = task.get_creation_datetime_utc() 

681 index.when_created_iso = task.when_created 

682 # noinspection PyProtectedMember 

683 index.when_added_batch_utc = task._when_added_batch_utc 

684 index.adding_user_id = task.get_adding_user_id() 

685 index.group_id = task.group_id 

686 index.task_is_complete = task.is_complete() 

687 

688 return index 

689 

690 @classmethod 

691 def index_task(cls, task: Task, session: SqlASession, 

692 indexed_at_utc: Pendulum) -> None: 

693 """ 

694 Indexes a task and inserts the index into the database. 

695 

696 Args: 

697 task: 

698 a :class:`camcops_server.cc_modules.cc_task.Task` 

699 session: 

700 an SQLAlchemy Session 

701 indexed_at_utc: 

702 current time in UTC 

703 """ 

704 index = cls.make_from_task(task, 

705 indexed_at_utc=indexed_at_utc) 

706 session.add(index) 

707 

708 @classmethod 

709 def unindex_task(cls, task: Task, session: SqlASession) -> None: 

710 """ 

711 Removes a task index from the database. 

712 

713 Args: 

714 task: 

715 a :class:`camcops_server.cc_modules.cc_task.Task` 

716 session: 

717 an SQLAlchemy Session 

718 """ 

719 

720 # noinspection PyUnresolvedReferences 

721 idxtable = cls.__table__ # type: Table 

722 idxcols = idxtable.columns 

723 tasktablename = task.__class__.tablename 

724 session.execute( 

725 idxtable.delete() 

726 .where(idxcols.task_table_name == tasktablename) 

727 .where(idxcols.task_pk == task.pk) 

728 ) 

729 

730 # ------------------------------------------------------------------------- 

731 # Regenerate index 

732 # ------------------------------------------------------------------------- 

733 

734 @classmethod 

735 def rebuild_index_for_task_type(cls, session: SqlASession, 

736 taskclass: Type[Task], 

737 indexed_at_utc: Pendulum, 

738 delete_first: bool = True) -> None: 

739 """ 

740 Rebuilds the index for a particular task type. 

741 

742 Args: 

743 session: an SQLAlchemy Session 

744 taskclass: a subclass of 

745 :class:`camcops_server.cc_modules.cc_task.Task` 

746 indexed_at_utc: current time in UTC 

747 delete_first: delete old index entries first? Should always be True 

748 unless called as part of a master rebuild that deletes 

749 everything first. 

750 """ 

751 # noinspection PyUnresolvedReferences 

752 idxtable = cls.__table__ # type: Table 

753 idxcols = idxtable.columns 

754 tasktablename = taskclass.tablename 

755 log.info("Rebuilding task index for {}", tasktablename) 

756 # Delete all entries for this task 

757 if delete_first: 

758 session.execute( 

759 idxtable.delete() 

760 .where(idxcols.table_name == tasktablename) 

761 ) 

762 # Create new entries 

763 # noinspection PyPep8,PyUnresolvedReferences,PyProtectedMember 

764 q = ( 

765 session.query(taskclass) 

766 .filter(taskclass._current == True) # noqa: E712 

767 .order_by(isotzdatetime_to_utcdatetime(taskclass.when_created)) 

768 ) 

769 for task in q: 

770 cls.index_task(task, session, indexed_at_utc) 

771 

772 @classmethod 

773 def rebuild_entire_task_index( 

774 cls, session: SqlASession, 

775 indexed_at_utc: Pendulum, 

776 skip_tasks_with_missing_tables: bool = False) -> None: 

777 """ 

778 Rebuilds the entire index. 

779 

780 Args: 

781 session: an SQLAlchemy Session 

782 indexed_at_utc: current time in UTC 

783 skip_tasks_with_missing_tables: should we skip over tasks if their 

784 tables are not in the database? (This is so we can rebuild an 

785 index from a database upgrade, but not crash because newer 

786 tasks haven't had their tables created yet.) 

787 """ 

788 log.info("Rebuilding entire task index") 

789 # noinspection PyUnresolvedReferences 

790 idxtable = cls.__table__ # type: Table 

791 

792 # Delete all entries 

793 with if_sqlserver_disable_constraints_triggers(session, 

794 idxtable.name): 

795 session.execute( 

796 idxtable.delete() 

797 ) 

798 

799 # Now rebuild: 

800 for taskclass in Task.all_subclasses_by_tablename(): 

801 if skip_tasks_with_missing_tables: 

802 basetable = taskclass.tablename 

803 engine = get_engine_from_session(session) 

804 if not table_exists(engine, basetable): 

805 continue 

806 cls.rebuild_index_for_task_type(session, taskclass, 

807 indexed_at_utc, 

808 delete_first=False) 

809 

810 # ------------------------------------------------------------------------- 

811 # Update index at the point of upload from a device 

812 # ------------------------------------------------------------------------- 

813 

814 @classmethod 

815 def update_task_index_for_upload(cls, 

816 session: SqlASession, 

817 tablechanges: UploadTableChanges, 

818 indexed_at_utc: Pendulum) -> None: 

819 """ 

820 Updates the index for a device's upload. 

821 

822 - Deletes index entries for records that are on the way out. 

823 - Creates index entries for records that are on the way in. 

824 - Deletes/recreates index entries for records being preserved. 

825 

826 Args: 

827 session: 

828 an SQLAlchemy Session 

829 tablechanges: 

830 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges` 

831 object describing the changes to a table 

832 indexed_at_utc: 

833 current time in UTC 

834 """ # noqa 

835 tasktablename = tablechanges.tablename 

836 d = tablename_to_task_class_dict() 

837 try: 

838 taskclass = d[tasktablename] # may raise KeyError 

839 except KeyError: 

840 fail_user_error(f"Bug: no such task table: {tasktablename!r}") 

841 

842 # noinspection PyUnresolvedReferences 

843 idxtable = cls.__table__ # type: Table 

844 idxcols = idxtable.columns 

845 

846 # Delete the old. 

847 delete_index_pks = tablechanges.task_delete_index_pks 

848 if delete_index_pks: 

849 log.debug("Deleting old task indexes: {}, server PKs {}", 

850 tasktablename, delete_index_pks) 

851 # noinspection PyProtectedMember 

852 session.execute( 

853 idxtable.delete() 

854 .where(idxcols.task_table_name == tasktablename) 

855 .where(idxcols.task_pk.in_(delete_index_pks)) 

856 ) 

857 

858 # Create the new. 

859 reindex_pks = tablechanges.task_reindex_pks 

860 if reindex_pks: 

861 log.debug("Recreating task indexes: {}, server PKs {}", 

862 tasktablename, reindex_pks) 

863 # noinspection PyUnboundLocalVariable,PyProtectedMember 

864 q = ( 

865 session.query(taskclass) 

866 .filter(taskclass._pk.in_(reindex_pks)) 

867 ) 

868 for task in q: 

869 cls.index_task(task, session, 

870 indexed_at_utc=indexed_at_utc) 

871 

872 # ------------------------------------------------------------------------- 

873 # Check index 

874 # ------------------------------------------------------------------------- 

875 @classmethod 

876 def check_index(cls, session: SqlASession, 

877 show_all_bad: bool = False) -> bool: 

878 """ 

879 Checks the index. 

880 

881 Args: 

882 session: 

883 an SQLAlchemy Session 

884 show_all_bad: 

885 show all bad entries? (If false, return upon the first) 

886 

887 Returns: 

888 bool: is the index OK? 

889 """ 

890 ok = True 

891 

892 log.info("Checking all task indexes represent valid entries") 

893 for taskclass in Task.all_subclasses_by_tablename(): 

894 tasktablename = taskclass.tablename 

895 log.debug("Checking {}", tasktablename) 

896 # noinspection PyUnresolvedReferences,PyProtectedMember 

897 q_idx_without_original = session.query(TaskIndexEntry).filter( 

898 TaskIndexEntry.task_table_name == tasktablename, 

899 ~exists() 

900 .select_from(taskclass.__table__) 

901 .where(and_( 

902 TaskIndexEntry.task_pk == taskclass._pk, 

903 taskclass._current == True, # noqa: E712 

904 )) 

905 ) 

906 # No check for a valid patient at this time. 

907 for index in q_idx_without_original: 

908 log.error("Task index without matching original: {!r}", index) 

909 ok = False 

910 if not show_all_bad: 

911 return ok 

912 

913 log.info("Checking all tasks have an index") 

914 for taskclass in Task.all_subclasses_by_tablename(): 

915 tasktablename = taskclass.tablename 

916 log.debug("Checking {}", tasktablename) 

917 # noinspection PyUnresolvedReferences,PyProtectedMember 

918 q_original_with_idx = session.query(taskclass).filter( 

919 taskclass._current == True, # noqa: E712 

920 ~exists().select_from( 

921 TaskIndexEntry.__table__ 

922 ).where(and_( 

923 TaskIndexEntry.task_pk == taskclass._pk, 

924 TaskIndexEntry.task_table_name == tasktablename, 

925 )) 

926 ) 

927 for orig in q_original_with_idx: 

928 log.error("Task without index entry: {!r}", orig) 

929 ok = False 

930 if not show_all_bad: 

931 return ok 

932 

933 return ok 

934 

935 

936# ============================================================================= 

937# Wide-ranging index update functions 

938# ============================================================================= 

939 

940def reindex_everything(session: SqlASession, 

941 skip_tasks_with_missing_tables: bool = False) -> None: 

942 """ 

943 Deletes from and rebuilds all server index tables. 

944 

945 Args: 

946 session: an SQLAlchemy Session 

947 skip_tasks_with_missing_tables: should we skip over tasks if their 

948 tables are not in the database? (This is so we can rebuild an index 

949 from a database upgrade, but not crash because newer tasks haven't 

950 had their tables created yet.) 

951 """ 

952 now = Pendulum.utcnow() 

953 log.info("Reindexing database; indexed_at_utc = {}", now) 

954 PatientIdNumIndexEntry.rebuild_idnum_index(session, now) 

955 TaskIndexEntry.rebuild_entire_task_index( 

956 session, now, 

957 skip_tasks_with_missing_tables=skip_tasks_with_missing_tables) 

958 

959 

960def update_indexes_and_push_exports(req: "CamcopsRequest", 

961 batchdetails: BatchDetails, 

962 tablechanges: UploadTableChanges) -> None: 

963 """ 

964 Update server indexes, if required. 

965 

966 Also triggers background jobs to export "new arrivals", if required. 

967 

968 Args: 

969 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

970 batchdetails: the :class:`BatchDetails` 

971 tablechanges: 

972 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges` 

973 object describing the changes to a table 

974 """ # noqa 

975 tablename = tablechanges.tablename 

976 if tablename == PatientIdNum.__tablename__: 

977 # Update idnum index 

978 PatientIdNumIndexEntry.update_idnum_index_for_upload( 

979 session=req.dbsession, 

980 indexed_at_utc=batchdetails.batchtime, 

981 tablechanges=tablechanges, 

982 ) 

983 elif tablename in all_task_tablenames(): 

984 # Update task index 

985 TaskIndexEntry.update_task_index_for_upload( 

986 session=req.dbsession, 

987 tablechanges=tablechanges, 

988 indexed_at_utc=batchdetails.batchtime 

989 ) 

990 # Push exports 

991 recipients = req.all_push_recipients 

992 uploading_group_id = req.user.upload_group_id 

993 for recipient in recipients: 

994 recipient_name = recipient.recipient_name 

995 for pk in tablechanges.get_task_push_export_pks( 

996 recipient=recipient, 

997 uploading_group_id=uploading_group_id): 

998 req.add_export_push_request(recipient_name, tablename, pk) 

999 # ... will be transmitted *after* the request performs COMMIT 

1000 

1001 

1002def check_indexes(session: SqlASession, show_all_bad: bool = False) -> bool: 

1003 """ 

1004 Checks all server index tables. 

1005 

1006 Args: 

1007 session: 

1008 an SQLAlchemy Session 

1009 show_all_bad: 

1010 show all bad entries? (If false, return upon the first) 

1011 

1012 Returns: 

1013 bool: are the indexes OK? 

1014 """ 

1015 p_ok = PatientIdNumIndexEntry.check_index(session, show_all_bad) 

1016 if p_ok: 

1017 log.info("Patient ID number index is good") 

1018 else: 

1019 log.error("Patient ID number index is bad") 

1020 if not show_all_bad: 

1021 return False 

1022 t_ok = TaskIndexEntry.check_index(session, show_all_bad) 

1023 if t_ok: 

1024 log.info("Task index is good") 

1025 else: 

1026 log.error("Task index is bad") 

1027 return p_ok and t_ok