Coverage for cc_modules/cc_taskindex.py: 56%

294 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:51 +0100

1""" 

2camcops_server/cc_modules/cc_taskindex.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Server-side task index.** 

27 

28Note in particular that if you, as a developer, change the ``is_complete()`` 

29criteria for a task, you should cause the server index to be rebuilt (because 

30it caches ``is_complete()`` information). 

31 

32""" 

33 

34import datetime 

35import logging 

36from typing import Any, List, Optional, Type, TYPE_CHECKING 

37 

38from cardinal_pythonlib.datetimefunc import pendulum_to_datetime 

39from cardinal_pythonlib.logs import BraceStyleAdapter 

40from cardinal_pythonlib.reprfunc import simple_repr 

41from cardinal_pythonlib.sqlalchemy.session import get_engine_from_session 

42from cardinal_pythonlib.sqlalchemy.schema import table_exists 

43from cardinal_pythonlib.sqlalchemy.sqlserver import ( 

44 if_sqlserver_disable_constraints_triggers, 

45) 

46from pendulum import DateTime as Pendulum 

47import pyramid.httpexceptions as exc 

48from sqlalchemy.orm import ( 

49 Mapped, 

50 mapped_column, 

51 relationship, 

52 Session as SqlASession, 

53) 

54from sqlalchemy.sql.expression import and_, exists, join, literal, select 

55from sqlalchemy.sql.schema import ForeignKey 

56from sqlalchemy.sql.sqltypes import BigInteger 

57 

58from camcops_server.cc_modules.cc_client_api_core import ( 

59 BatchDetails, 

60 fail_user_error, 

61 UploadTableChanges, 

62) 

63from camcops_server.cc_modules.cc_constants import ERA_NOW 

64from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition 

65from camcops_server.cc_modules.cc_patient import Patient 

66from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

67from camcops_server.cc_modules.cc_sqla_coltypes import ( 

68 EraColType, 

69 isotzdatetime_to_utcdatetime, 

70 PendulumDateTimeAsIsoTextColType, 

71 TableNameColType, 

72) 

73from camcops_server.cc_modules.cc_sqlalchemy import Base 

74from camcops_server.cc_modules.cc_task import ( 

75 all_task_tablenames, 

76 tablename_to_task_class_dict, 

77 Task, 

78) 

79from camcops_server.cc_modules.cc_user import User 

80 

81if TYPE_CHECKING: 

82 from sqlalchemy.sql.elements import ColumnElement 

83 

84 from camcops_server.cc_modules.cc_request import CamcopsRequest 

85 

86log = BraceStyleAdapter(logging.getLogger(__name__)) 

87 

88 

89# ============================================================================= 

90# Helper functions 

91# ============================================================================= 

92 

93 

94def task_factory_unfiltered( 

95 dbsession: SqlASession, basetable: str, serverpk: int 

96) -> Optional[Task]: 

97 """ 

98 Load a task from the database and return it. 

99 No permission filtering is performed. (Used by 

100 :class:`camcops_server.cc_modules.cc_taskindex.TaskIndexEntry`.) 

101 

102 Args: 

103 dbsession: a :class:`sqlalchemy.orm.session.Session` 

104 basetable: name of the task's base table 

105 serverpk: server PK of the task 

106 

107 Returns: 

108 the task, or ``None`` if the PK doesn't exist 

109 

110 Raises: 

111 :exc:`HTTPBadRequest` if the table doesn't exist 

112 """ 

113 d = tablename_to_task_class_dict() 

114 try: 

115 cls = d[basetable] # may raise KeyError 

116 except KeyError: 

117 raise exc.HTTPBadRequest(f"No such task table: {basetable!r}") 

118 # noinspection PyProtectedMember 

119 q = dbsession.query(cls).filter(cls._pk == serverpk) 

120 return q.first() 

121 

122 

123# ============================================================================= 

124# PatientIdNumIndexEntry 

125# ============================================================================= 

126 

127 

128class PatientIdNumIndexEntry(Base): 

129 """ 

130 Represents a server index entry for a 

131 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`. 

132 

133 - Only current ID numbers are indexed. 

134 """ 

135 

136 __tablename__ = "_idnum_index" 

137 

138 idnum_pk: Mapped[int] = mapped_column( 

139 primary_key=True, 

140 index=True, 

141 comment="Server primary key of the PatientIdNum " 

142 "(and of the PatientIdNumIndexEntry)", 

143 ) 

144 indexed_at_utc: Mapped[datetime.datetime] = mapped_column( 

145 comment="When this index entry was created", 

146 ) 

147 

148 # noinspection PyProtectedMember 

149 patient_pk: Mapped[Optional[int]] = mapped_column( 

150 ForeignKey(Patient._pk), 

151 index=True, 

152 comment="Server primary key of the Patient", 

153 ) 

154 which_idnum: Mapped[int] = mapped_column( 

155 ForeignKey(IdNumDefinition.which_idnum), 

156 index=True, 

157 comment="Which of the server's ID numbers is this?", 

158 ) 

159 idnum_value: Mapped[Optional[int]] = mapped_column( 

160 BigInteger, comment="The value of the ID number" 

161 ) 

162 

163 # Relationships: 

164 patient = relationship(Patient) 

165 

166 def __repr__(self) -> str: 

167 return simple_repr( 

168 self, ["idnum_pk", "patient_pk", "which_idnum", "idnum_value"] 

169 ) 

170 

171 # ------------------------------------------------------------------------- 

172 # Create 

173 # ------------------------------------------------------------------------- 

174 

175 @classmethod 

176 def make_from_idnum(cls, idnum: PatientIdNum) -> "PatientIdNumIndexEntry": 

177 """ 

178 Returns an ID index entry for the specified 

179 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`. The 

180 returned index requires inserting into a database session. 

181 """ 

182 # noinspection PyProtectedMember 

183 assert idnum._current, "Only index current PatientIdNum objects" 

184 index = cls() 

185 index.idnum_pk = idnum.pk 

186 index.patient_pk = idnum.get_patient_server_pk() 

187 index.which_idnum = idnum.which_idnum 

188 index.idnum_value = idnum.idnum_value 

189 index.indexed_at_utc = Pendulum.now() 

190 return index 

191 

192 @classmethod 

193 def index_idnum(cls, idnum: PatientIdNum, session: SqlASession) -> None: 

194 """ 

195 Indexes an ID number and inserts the index into the database. 

196 

197 Args: 

198 idnum: a 

199 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum` 

200 session: 

201 an SQLAlchemy Session 

202 """ 

203 index = cls.make_from_idnum(idnum) 

204 session.add(index) 

205 

206 @classmethod 

207 def unindex_patient(cls, patient: Patient, session: SqlASession) -> None: 

208 """ 

209 Removes all ID number indexes from the database for a patient. 

210 

211 Args: 

212 patient: 

213 :class:`camcops_server.cc_modules.cc_patient.Patient` 

214 session: 

215 an SQLAlchemy Session 

216 """ 

217 

218 # noinspection PyUnresolvedReferences 

219 idxtable = cls.__table__ # type: ignore[assignment] # type: Table 

220 idxcols = idxtable.columns 

221 # noinspection PyProtectedMember 

222 session.execute( 

223 idxtable.delete().where(idxcols.patient_pk == patient._pk) # type: ignore[attr-defined] # noqa: E501 

224 ) 

225 

226 # ------------------------------------------------------------------------- 

227 # Regenerate index 

228 # ------------------------------------------------------------------------- 

229 

230 @classmethod 

231 def rebuild_idnum_index( 

232 cls, session: SqlASession, indexed_at_utc: Pendulum 

233 ) -> None: 

234 """ 

235 Rebuilds the index entirely. Uses SQLAlchemy Core (not ORM) for speed. 

236 

237 Args: 

238 session: an SQLAlchemy Session 

239 indexed_at_utc: current time in UTC 

240 """ 

241 log.info("Rebuilding patient ID number index") 

242 

243 # noinspection PyUnresolvedReferences 

244 indextable = PatientIdNumIndexEntry.__table__ # type: ignore[assignment] # noqa: E501 

245 indexcols = indextable.columns 

246 # noinspection PyUnresolvedReferences 

247 idnumtable = PatientIdNum.__table__ # type: ignore[assignment] 

248 idnumcols = idnumtable.columns 

249 # noinspection PyUnresolvedReferences 

250 patienttable = Patient.__table__ # type: ignore[assignment] 

251 patientcols = patienttable.columns 

252 

253 # Delete all entries 

254 with if_sqlserver_disable_constraints_triggers( 

255 session, indextable.name # type: ignore[attr-defined] 

256 ): 

257 session.execute(indextable.delete()) # type: ignore[attr-defined] 

258 

259 # Create new ones 

260 indexed_at_utc = pendulum_to_datetime(indexed_at_utc) 

261 # ... SQLite has trouble otherwise 

262 # noinspection PyProtectedMember,PyPep8 

263 session.execute( 

264 indextable.insert().from_select( # type: ignore[attr-defined] 

265 # Target: 

266 [ 

267 indexcols.idnum_pk, 

268 indexcols.indexed_at_utc, 

269 indexcols.patient_pk, 

270 indexcols.which_idnum, 

271 indexcols.idnum_value, 

272 ], 

273 # Source: 

274 ( 

275 select( 

276 idnumcols._pk, 

277 literal(indexed_at_utc), 

278 patientcols._pk, 

279 idnumcols.which_idnum, 

280 idnumcols.idnum_value, 

281 ) 

282 .select_from( 

283 join( 

284 idnumtable, 

285 patienttable, 

286 and_( 

287 idnumcols._device_id == patientcols._device_id, 

288 idnumcols._era == patientcols._era, 

289 idnumcols.patient_id == patientcols.id, 

290 ), 

291 ) 

292 ) 

293 .where(idnumcols._current == True) # noqa: E712 

294 .where(idnumcols.idnum_value.isnot(None)) 

295 .where(patientcols._current == True) # noqa: E712 

296 ), 

297 ) 

298 ) 

299 

300 # ------------------------------------------------------------------------- 

301 # Check index 

302 # ------------------------------------------------------------------------- 

303 @classmethod 

304 def check_index( 

305 cls, session: SqlASession, show_all_bad: bool = False 

306 ) -> bool: 

307 """ 

308 Checks the index. 

309 

310 Args: 

311 session: 

312 an SQLAlchemy Session 

313 show_all_bad: 

314 show all bad entries? (If false, return upon the first) 

315 

316 Returns: 

317 bool: is the index OK? 

318 """ 

319 ok = True 

320 

321 log.info( 

322 "Checking all patient ID number indexes represent valid entries" 

323 ) 

324 # noinspection PyUnresolvedReferences,PyProtectedMember 

325 q_idx_without_original = session.query(PatientIdNumIndexEntry).filter( 

326 ~exists() 

327 .select_from( 

328 PatientIdNum.__table__.join( 

329 Patient.__table__, 

330 Patient.id == PatientIdNum.patient_id, 

331 Patient._device_id == PatientIdNum._device_id, # type: ignore[arg-type] # noqa: E501 

332 Patient._era == PatientIdNum._era, # type: ignore[arg-type] # noqa: E501 

333 ) 

334 ) 

335 .where( 

336 and_( 

337 PatientIdNum._pk == PatientIdNumIndexEntry.idnum_pk, 

338 PatientIdNum._current == True, # noqa: E712 

339 PatientIdNum.which_idnum 

340 == PatientIdNumIndexEntry.which_idnum, 

341 PatientIdNum.idnum_value 

342 == PatientIdNumIndexEntry.idnum_value, 

343 Patient._pk == PatientIdNumIndexEntry.patient_pk, 

344 Patient._current == True, # noqa: E712 

345 ) 

346 ) 

347 ) 

348 for index in q_idx_without_original: 

349 log.error( 

350 "Patient ID number index without matching " "original: {!r}", 

351 index, 

352 ) 

353 ok = False 

354 if not show_all_bad: 

355 return ok 

356 

357 log.info("Checking all patient ID numbers have an index") 

358 # noinspection PyUnresolvedReferences,PyProtectedMember 

359 q_original_with_idx = session.query(PatientIdNum).filter( 

360 PatientIdNum._current == True, # noqa: E712 

361 PatientIdNum.idnum_value.isnot(None), 

362 ~exists() 

363 .select_from(PatientIdNumIndexEntry.__table__) 

364 .where( 

365 and_( 

366 PatientIdNum._pk == PatientIdNumIndexEntry.idnum_pk, 

367 PatientIdNum.which_idnum 

368 == PatientIdNumIndexEntry.which_idnum, 

369 PatientIdNum.idnum_value 

370 == PatientIdNumIndexEntry.idnum_value, 

371 ) 

372 ), 

373 ) 

374 for orig in q_original_with_idx: 

375 log.error("ID number without index entry: {!r}", orig) 

376 ok = False 

377 if not show_all_bad: 

378 return ok 

379 

380 return ok 

381 

382 # ------------------------------------------------------------------------- 

383 # Update index at the point of upload from a device 

384 # ------------------------------------------------------------------------- 

385 

386 @classmethod 

387 def update_idnum_index_for_upload( 

388 cls, 

389 session: SqlASession, 

390 indexed_at_utc: Pendulum, 

391 tablechanges: UploadTableChanges, 

392 ) -> None: 

393 """ 

394 Updates the index for a device's upload. 

395 

396 - Deletes index entries for records that are on the way out. 

397 - Creates index entries for records that are on the way in. 

398 - Should be called after both the Patient and PatientIdNum tables are 

399 committed; see special ordering in 

400 :func:`camcops_server.cc_modules.client_api.commit_all`. 

401 

402 Args: 

403 session: 

404 an SQLAlchemy Session 

405 indexed_at_utc: 

406 current time in UTC 

407 tablechanges: 

408 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges` 

409 object describing the changes to a table 

410 """ # noqa 

411 # noinspection PyUnresolvedReferences 

412 indextable = PatientIdNumIndexEntry.__table__ # type: ignore[assignment] # noqa: E501 

413 indexcols = indextable.columns 

414 # noinspection PyUnresolvedReferences 

415 idnumtable = PatientIdNum.__table__ # type: ignore[assignment] 

416 idnumcols = idnumtable.columns 

417 # noinspection PyUnresolvedReferences 

418 patienttable = Patient.__table__ # type: ignore[assignment] 

419 patientcols = patienttable.columns 

420 

421 # Delete the old 

422 removal_pks = tablechanges.idnum_delete_index_pks 

423 if removal_pks: 

424 log.debug( 

425 "Deleting old ID number indexes: server PKs {}", removal_pks 

426 ) 

427 session.execute( 

428 indextable.delete().where( # type: ignore[attr-defined] 

429 indextable.c.idnum_pk.in_(removal_pks) 

430 ) 

431 ) 

432 

433 # Create the new 

434 addition_pks = tablechanges.idnum_add_index_pks 

435 if addition_pks: 

436 select_fields: list[ColumnElement[Any]] = [ 

437 idnumcols._pk, 

438 literal(indexed_at_utc), 

439 patientcols._pk, 

440 idnumcols.which_idnum, 

441 idnumcols.idnum_value, 

442 ] 

443 

444 log.debug("Adding ID number indexes: server PKs {}", addition_pks) 

445 # noinspection PyPep8,PyProtectedMember 

446 session.execute( 

447 indextable.insert().from_select( # type: ignore[attr-defined] 

448 # Target: 

449 [ 

450 indexcols.idnum_pk, 

451 indexcols.indexed_at_utc, 

452 indexcols.patient_pk, 

453 indexcols.which_idnum, 

454 indexcols.idnum_value, 

455 ], 

456 # Source: 

457 ( 

458 select(*select_fields) 

459 .select_from( 

460 join( 

461 idnumtable, 

462 patienttable, 

463 and_( 

464 idnumcols._device_id 

465 == patientcols._device_id, 

466 idnumcols._era == patientcols._era, 

467 idnumcols.patient_id == patientcols.id, 

468 ), 

469 ) 

470 ) 

471 .where(idnumcols._pk.in_(addition_pks)) 

472 .where(patientcols._current == True) # noqa: E712 

473 ), 

474 ) 

475 ) 

476 

477 

478# ============================================================================= 

479# TaskIndexEntry 

480# ============================================================================= 

481 

482 

483class TaskIndexEntry(Base): 

484 """ 

485 Represents a server index entry for a 

486 :class:`camcops_server.cc_modules.cc_task.Task`. 

487 

488 - Only current tasks are indexed. This simplifies direct linking to patient 

489 PKs. 

490 """ 

491 

492 __tablename__ = "_task_index" 

493 

494 index_entry_pk: Mapped[int] = mapped_column( 

495 primary_key=True, 

496 autoincrement=True, 

497 comment="Arbitrary primary key of this index entry", 

498 ) 

499 indexed_at_utc: Mapped[datetime.datetime] = mapped_column( 

500 comment="When this index entry was created", 

501 ) 

502 

503 # The next two fields link to our task: 

504 task_table_name: Mapped[Optional[str]] = mapped_column( 

505 TableNameColType, 

506 index=True, 

507 comment="Table name of the task's base table", 

508 ) 

509 task_pk: Mapped[Optional[int]] = mapped_column( 

510 index=True, 

511 comment="Server primary key of the task", 

512 ) 

513 # We can probably even represent this with an SQLAlchemy ORM relationship. 

514 # This is polymorphic loading (we'll return objects of different types) 

515 # based on concrete table inheritance (each type of object -- each task -- 

516 # has its own standalone table). 

517 # However, there are warnings about the inefficiency of this; see 

518 # https://docs.sqlalchemy.org/en/latest/orm/inheritance.html#concrete-table-inheritance 

519 # and we are trying to be efficient. So let's do via task() below. 

520 

521 # This links to the task's patient, if there is one: 

522 # noinspection PyProtectedMember 

523 patient_pk: Mapped[Optional[int]] = mapped_column( 

524 ForeignKey(Patient._pk), 

525 index=True, 

526 comment="Server primary key of the patient (if applicable)", 

527 ) 

528 

529 # These fields allow us to filter tasks efficiently: 

530 device_id: Mapped[int] = mapped_column( 

531 ForeignKey("_security_devices.id"), 

532 index=True, 

533 comment="ID of the source tablet device", 

534 ) 

535 era: Mapped[str] = mapped_column( 

536 EraColType, 

537 index=True, 

538 comment="Era (_era) field of the source record", 

539 ) 

540 when_created_utc: Mapped[datetime.datetime] = mapped_column( 

541 index=True, 

542 comment="Date/time this task instance was created (UTC)", 

543 ) 

544 when_created_iso: Mapped[Pendulum] = mapped_column( 

545 PendulumDateTimeAsIsoTextColType, 

546 index=True, 

547 comment="Date/time this task instance was created (ISO 8601)", 

548 ) # Pendulum on the Python side 

549 when_added_batch_utc: Mapped[datetime.datetime] = mapped_column( 

550 index=True, 

551 comment="Date/time this task index was uploaded (UTC)", 

552 ) 

553 adding_user_id: Mapped[Optional[int]] = mapped_column( 

554 ForeignKey("_security_users.id"), 

555 comment="ID of user that added this task", 

556 ) 

557 group_id: Mapped[int] = mapped_column( 

558 ForeignKey("_security_groups.id"), 

559 index=True, 

560 comment="ID of group to which this task belongs", 

561 ) 

562 task_is_complete: Mapped[bool] = mapped_column( 

563 comment="Is the task complete (as judged by the server when the index " 

564 "entry was created)?", 

565 ) 

566 

567 # Relationships: 

568 patient = relationship(Patient) 

569 _adding_user = relationship(User) 

570 

571 def __repr__(self) -> str: 

572 return simple_repr( 

573 self, 

574 [ 

575 "index_entry_pk", 

576 "task_table_name", 

577 "task_pk", 

578 "patient_pk", 

579 "device_id", 

580 "era", 

581 "when_created_utc", 

582 "when_created_iso", 

583 "when_added_batch_utc", 

584 "adding_user_id", 

585 "group_id", 

586 "task_is_complete", 

587 ], 

588 ) 

589 

590 # ------------------------------------------------------------------------- 

591 # Fetch the task 

592 # ------------------------------------------------------------------------- 

593 

594 @property 

595 def task(self) -> Optional[Task]: 

596 """ 

597 Returns: 

598 the associated :class:`camcops_server.cc_modules.cc_task.Task`, or 

599 ``None`` if none exists. 

600 

601 Raises: 

602 :exc:`HTTPBadRequest` if the table doesn't exist 

603 """ 

604 dbsession = SqlASession.object_session(self) 

605 assert dbsession, ( 

606 "TaskIndexEntry.task called on a TaskIndexEntry " 

607 "that's not yet in a database session" 

608 ) 

609 return task_factory_unfiltered( 

610 dbsession, self.task_table_name, self.task_pk 

611 ) 

612 

613 # ------------------------------------------------------------------------- 

614 # Other properties mirroring those of Task, for duck typing 

615 # ------------------------------------------------------------------------- 

616 

617 @property 

618 def is_anonymous(self) -> bool: 

619 """ 

620 Is the task anonymous? 

621 """ 

622 return self.patient_pk is None 

623 

624 def is_complete(self) -> bool: 

625 """ 

626 Is the task complete? 

627 """ 

628 return self.task_is_complete 

629 

630 @property 

631 def _current(self) -> bool: 

632 """ 

633 All task index entries represent complete tasks, so this always returns 

634 ``True``. 

635 """ 

636 return True 

637 

638 @property 

639 def pk(self) -> int: 

640 """ 

641 Return's the task's server PK. 

642 """ 

643 return self.task_pk 

644 

645 @property 

646 def tablename(self) -> str: 

647 """ 

648 Returns the base table name of the task. 

649 """ 

650 return self.task_table_name 

651 

652 @property 

653 def shortname(self) -> str: 

654 """ 

655 Returns the task's shortname. 

656 """ 

657 d = tablename_to_task_class_dict() 

658 taskclass = d[self.task_table_name] 

659 return taskclass.shortname 

660 

661 def is_live_on_tablet(self) -> bool: 

662 """ 

663 Is the task live on the source device (e.g. tablet)? 

664 """ 

665 return self.era == ERA_NOW 

666 

667 @property 

668 def when_created(self) -> Pendulum: 

669 """ 

670 Returns the creation date/time as a Pendulum DateTime object. 

671 """ 

672 return self.when_created_iso 

673 

674 def any_patient_idnums_invalid(self, req: "CamcopsRequest") -> bool: 

675 """ 

676 Do we have a patient who has any invalid ID numbers? 

677 

678 Args: 

679 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

680 """ 

681 idnums = self.get_patient_idnum_objects() 

682 for idnum in idnums: 

683 if not idnum.is_fully_valid(req): 

684 return True 

685 return False 

686 

687 def get_patient_idnum_objects(self) -> List[PatientIdNum]: 

688 """ 

689 Gets all :class:`PatientIdNum` objects for the patient. 

690 """ 

691 return self.patient.get_idnum_objects() if self.patient else [] 

692 

693 # ------------------------------------------------------------------------- 

694 # Create 

695 # ------------------------------------------------------------------------- 

696 

697 @classmethod 

698 def make_from_task( 

699 cls, task: Task, indexed_at_utc: Pendulum 

700 ) -> "TaskIndexEntry": 

701 """ 

702 Returns a task index entry for the specified 

703 :class:`camcops_server.cc_modules.cc_task.Task`. The 

704 returned index requires inserting into a database session. 

705 

706 Args: 

707 task: 

708 a :class:`camcops_server.cc_modules.cc_task.Task` 

709 indexed_at_utc: 

710 current time in UTC 

711 """ 

712 assert indexed_at_utc is not None, "Missing indexed_at_utc" 

713 

714 index = cls() 

715 

716 index.indexed_at_utc = indexed_at_utc 

717 

718 index.task_table_name = task.tablename 

719 index.task_pk = task.pk 

720 

721 patient = task.patient 

722 index.patient_pk = patient.pk if patient else None 

723 

724 index.device_id = task.device_id 

725 index.era = task.era 

726 index.when_created_utc = task.get_creation_datetime_utc() 

727 index.when_created_iso = task.when_created 

728 # noinspection PyProtectedMember 

729 index.when_added_batch_utc = task._when_added_batch_utc 

730 index.adding_user_id = task.get_adding_user_id() 

731 index.group_id = task.group_id 

732 index.task_is_complete = task.is_complete() 

733 

734 return index 

735 

736 @classmethod 

737 def index_task( 

738 cls, task: Task, session: SqlASession, indexed_at_utc: Pendulum 

739 ) -> None: 

740 """ 

741 Indexes a task and inserts the index into the database. 

742 

743 Args: 

744 task: 

745 a :class:`camcops_server.cc_modules.cc_task.Task` 

746 session: 

747 an SQLAlchemy Session 

748 indexed_at_utc: 

749 current time in UTC 

750 """ 

751 index = cls.make_from_task(task, indexed_at_utc=indexed_at_utc) 

752 session.add(index) 

753 

754 @classmethod 

755 def unindex_task(cls, task: Task, session: SqlASession) -> None: 

756 """ 

757 Removes a task index from the database. 

758 

759 Args: 

760 task: 

761 a :class:`camcops_server.cc_modules.cc_task.Task` 

762 session: 

763 an SQLAlchemy Session 

764 """ 

765 

766 # noinspection PyUnresolvedReferences 

767 idxtable = cls.__table__ # type: ignore[assignment] # type: Table 

768 idxcols = idxtable.columns 

769 tasktablename = task.__class__.tablename 

770 session.execute( 

771 idxtable.delete() # type: ignore[attr-defined] 

772 .where(idxcols.task_table_name == tasktablename) 

773 .where(idxcols.task_pk == task.pk) 

774 ) 

775 

776 # ------------------------------------------------------------------------- 

777 # Regenerate index 

778 # ------------------------------------------------------------------------- 

779 

780 @classmethod 

781 def rebuild_index_for_task_type( 

782 cls, 

783 session: SqlASession, 

784 taskclass: Type[Task], 

785 indexed_at_utc: Pendulum, 

786 delete_first: bool = True, 

787 ) -> None: 

788 """ 

789 Rebuilds the index for a particular task type. 

790 

791 Args: 

792 session: an SQLAlchemy Session 

793 taskclass: a subclass of 

794 :class:`camcops_server.cc_modules.cc_task.Task` 

795 indexed_at_utc: current time in UTC 

796 delete_first: delete old index entries first? Should always be True 

797 unless called as part of a master rebuild that deletes 

798 everything first. 

799 """ 

800 # noinspection PyUnresolvedReferences 

801 idxtable = cls.__table__ # type: ignore[assignment] # type: Table 

802 idxcols = idxtable.columns 

803 tasktablename = taskclass.tablename 

804 log.info("Rebuilding task index for {}", tasktablename) 

805 # Delete all entries for this task 

806 if delete_first: 

807 session.execute( 

808 idxtable.delete().where(idxcols.table_name == tasktablename) # type: ignore[attr-defined] # noqa: E501 

809 ) 

810 # Create new entries 

811 # noinspection PyPep8,PyUnresolvedReferences,PyProtectedMember 

812 q = ( 

813 session.query(taskclass) 

814 .filter(taskclass._current == True) # noqa: E712 

815 .order_by(isotzdatetime_to_utcdatetime(taskclass.when_created)) 

816 ) 

817 for task in q: 

818 cls.index_task(task, session, indexed_at_utc) 

819 

820 @classmethod 

821 def rebuild_entire_task_index( 

822 cls, 

823 session: SqlASession, 

824 indexed_at_utc: Pendulum, 

825 skip_tasks_with_missing_tables: bool = False, 

826 ) -> None: 

827 """ 

828 Rebuilds the entire index. 

829 

830 Args: 

831 session: an SQLAlchemy Session 

832 indexed_at_utc: current time in UTC 

833 skip_tasks_with_missing_tables: should we skip over tasks if their 

834 tables are not in the database? (This is so we can rebuild an 

835 index from a database upgrade, but not crash because newer 

836 tasks haven't had their tables created yet.) 

837 """ 

838 log.info("Rebuilding entire task index") 

839 # noinspection PyUnresolvedReferences 

840 idxtable = cls.__table__ # type: ignore[assignment] # type: Table 

841 

842 # Delete all entries 

843 with if_sqlserver_disable_constraints_triggers(session, idxtable.name): # type: ignore[attr-defined] # noqa: E501 

844 session.execute(idxtable.delete()) # type: ignore[attr-defined] 

845 

846 # Now rebuild: 

847 for taskclass in Task.all_subclasses_by_tablename(): 

848 if skip_tasks_with_missing_tables: 

849 basetable = taskclass.tablename 

850 engine = get_engine_from_session(session) 

851 if not table_exists(engine, basetable): 

852 continue 

853 cls.rebuild_index_for_task_type( 

854 session, taskclass, indexed_at_utc, delete_first=False 

855 ) 

856 

857 # ------------------------------------------------------------------------- 

858 # Update index at the point of upload from a device 

859 # ------------------------------------------------------------------------- 

860 

861 @classmethod 

862 def update_task_index_for_upload( 

863 cls, 

864 session: SqlASession, 

865 tablechanges: UploadTableChanges, 

866 indexed_at_utc: Pendulum, 

867 ) -> None: 

868 """ 

869 Updates the index for a device's upload. 

870 

871 - Deletes index entries for records that are on the way out. 

872 - Creates index entries for records that are on the way in. 

873 - Deletes/recreates index entries for records being preserved. 

874 

875 Args: 

876 session: 

877 an SQLAlchemy Session 

878 tablechanges: 

879 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges` 

880 object describing the changes to a table 

881 indexed_at_utc: 

882 current time in UTC 

883 """ # noqa 

884 tasktablename = tablechanges.tablename 

885 d = tablename_to_task_class_dict() 

886 try: 

887 taskclass = d[tasktablename] # may raise KeyError 

888 except KeyError: 

889 fail_user_error(f"Bug: no such task table: {tasktablename!r}") 

890 

891 # noinspection PyUnresolvedReferences 

892 idxtable = cls.__table__ # type: ignore[assignment] # type: Table 

893 idxcols = idxtable.columns 

894 

895 # Delete the old. 

896 delete_index_pks = tablechanges.task_delete_index_pks 

897 if delete_index_pks: 

898 log.debug( 

899 "Deleting old task indexes: {}, server PKs {}", 

900 tasktablename, 

901 delete_index_pks, 

902 ) 

903 # noinspection PyProtectedMember 

904 session.execute( 

905 idxtable.delete() # type: ignore[attr-defined] 

906 .where(idxcols.task_table_name == tasktablename) 

907 .where(idxcols.task_pk.in_(delete_index_pks)) 

908 ) 

909 

910 # Create the new. 

911 reindex_pks = tablechanges.task_reindex_pks 

912 if reindex_pks: 

913 log.debug( 

914 "Recreating task indexes: {}, server PKs {}", 

915 tasktablename, 

916 reindex_pks, 

917 ) 

918 # noinspection PyUnboundLocalVariable,PyProtectedMember 

919 q = session.query(taskclass).filter(taskclass._pk.in_(reindex_pks)) 

920 for task in q: 

921 cls.index_task(task, session, indexed_at_utc=indexed_at_utc) 

922 

923 # ------------------------------------------------------------------------- 

924 # Check index 

925 # ------------------------------------------------------------------------- 

926 @classmethod 

927 def check_index( 

928 cls, session: SqlASession, show_all_bad: bool = False 

929 ) -> bool: 

930 """ 

931 Checks the index. 

932 

933 Args: 

934 session: 

935 an SQLAlchemy Session 

936 show_all_bad: 

937 show all bad entries? (If false, return upon the first) 

938 

939 Returns: 

940 bool: is the index OK? 

941 """ 

942 ok = True 

943 

944 log.info("Checking all task indexes represent valid entries") 

945 for taskclass in Task.all_subclasses_by_tablename(): 

946 tasktablename = taskclass.tablename 

947 log.debug("Checking {}", tasktablename) 

948 # noinspection PyUnresolvedReferences,PyProtectedMember 

949 q_idx_without_original = session.query(TaskIndexEntry).filter( 

950 TaskIndexEntry.task_table_name == tasktablename, 

951 ~exists() 

952 .select_from(taskclass.__table__) 

953 .where( 

954 and_( 

955 TaskIndexEntry.task_pk == taskclass._pk, 

956 taskclass._current == True, # noqa: E712 

957 ) 

958 ), 

959 ) 

960 # No check for a valid patient at this time. 

961 for index in q_idx_without_original: 

962 log.error("Task index without matching original: {!r}", index) 

963 ok = False 

964 if not show_all_bad: 

965 return ok 

966 

967 log.info("Checking all tasks have an index") 

968 for taskclass in Task.all_subclasses_by_tablename(): 

969 tasktablename = taskclass.tablename 

970 log.debug("Checking {}", tasktablename) 

971 # noinspection PyUnresolvedReferences,PyProtectedMember 

972 q_original_with_idx = session.query(taskclass).filter( 

973 taskclass._current == True, # noqa: E712 

974 ~exists() 

975 .select_from(TaskIndexEntry.__table__) 

976 .where( 

977 and_( 

978 TaskIndexEntry.task_pk == taskclass._pk, 

979 TaskIndexEntry.task_table_name == tasktablename, 

980 ) 

981 ), 

982 ) 

983 for orig in q_original_with_idx: 

984 log.error("Task without index entry: {!r}", orig) 

985 ok = False 

986 if not show_all_bad: 

987 return ok 

988 

989 return ok 

990 

991 

992# ============================================================================= 

993# Wide-ranging index update functions 

994# ============================================================================= 

995 

996 

997def reindex_everything( 

998 session: SqlASession, skip_tasks_with_missing_tables: bool = False 

999) -> None: 

1000 """ 

1001 Deletes from and rebuilds all server index tables. 

1002 

1003 Args: 

1004 session: an SQLAlchemy Session 

1005 skip_tasks_with_missing_tables: should we skip over tasks if their 

1006 tables are not in the database? (This is so we can rebuild an index 

1007 from a database upgrade, but not crash because newer tasks haven't 

1008 had their tables created yet.) 

1009 """ 

1010 now = Pendulum.utcnow() 

1011 log.info("Reindexing database; indexed_at_utc = {}", now) 

1012 PatientIdNumIndexEntry.rebuild_idnum_index(session, now) 

1013 TaskIndexEntry.rebuild_entire_task_index( 

1014 session, 

1015 now, 

1016 skip_tasks_with_missing_tables=skip_tasks_with_missing_tables, 

1017 ) 

1018 

1019 

1020def update_indexes_and_push_exports( 

1021 req: "CamcopsRequest", 

1022 batchdetails: BatchDetails, 

1023 tablechanges: UploadTableChanges, 

1024) -> None: 

1025 """ 

1026 Update server indexes, if required. 

1027 

1028 Also triggers background jobs to export "new arrivals", if required. 

1029 

1030 Args: 

1031 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1032 batchdetails: the :class:`BatchDetails` 

1033 tablechanges: 

1034 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges` 

1035 object describing the changes to a table 

1036 """ # noqa 

1037 tablename = tablechanges.tablename 

1038 if tablename == PatientIdNum.__tablename__: 

1039 # Update idnum index 

1040 PatientIdNumIndexEntry.update_idnum_index_for_upload( 

1041 session=req.dbsession, 

1042 indexed_at_utc=batchdetails.batchtime, 

1043 tablechanges=tablechanges, 

1044 ) 

1045 elif tablename in all_task_tablenames(): 

1046 # Update task index 

1047 TaskIndexEntry.update_task_index_for_upload( 

1048 session=req.dbsession, 

1049 tablechanges=tablechanges, 

1050 indexed_at_utc=batchdetails.batchtime, 

1051 ) 

1052 # Push exports 

1053 recipients = req.all_push_recipients 

1054 uploading_group_id = req.user.upload_group_id 

1055 for recipient in recipients: 

1056 recipient_name = recipient.recipient_name 

1057 for pk in tablechanges.get_task_push_export_pks( 

1058 recipient=recipient, uploading_group_id=uploading_group_id 

1059 ): 

1060 req.add_export_push_request(recipient_name, tablename, pk) 

1061 # ... will be transmitted *after* the request performs COMMIT 

1062 

1063 

1064def check_indexes(session: SqlASession, show_all_bad: bool = False) -> bool: 

1065 """ 

1066 Checks all server index tables. 

1067 

1068 Args: 

1069 session: 

1070 an SQLAlchemy Session 

1071 show_all_bad: 

1072 show all bad entries? (If false, return upon the first) 

1073 

1074 Returns: 

1075 bool: are the indexes OK? 

1076 """ 

1077 p_ok = PatientIdNumIndexEntry.check_index(session, show_all_bad) 

1078 if p_ok: 

1079 log.info("Patient ID number index is good") 

1080 else: 

1081 log.error("Patient ID number index is bad") 

1082 if not show_all_bad: 

1083 return False 

1084 t_ok = TaskIndexEntry.check_index(session, show_all_bad) 

1085 if t_ok: 

1086 log.info("Task index is good") 

1087 else: 

1088 log.error("Task index is bad") 

1089 return p_ok and t_ok