Coverage for cc_modules/cc_taskindex.py: 56%
294 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
1"""
2camcops_server/cc_modules/cc_taskindex.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Server-side task index.**
28Note in particular that if you, as a developer, change the ``is_complete()``
29criteria for a task, you should cause the server index to be rebuilt (because
30it caches ``is_complete()`` information).
32"""
34import datetime
35import logging
36from typing import Any, List, Optional, Type, TYPE_CHECKING
38from cardinal_pythonlib.datetimefunc import pendulum_to_datetime
39from cardinal_pythonlib.logs import BraceStyleAdapter
40from cardinal_pythonlib.reprfunc import simple_repr
41from cardinal_pythonlib.sqlalchemy.session import get_engine_from_session
42from cardinal_pythonlib.sqlalchemy.schema import table_exists
43from cardinal_pythonlib.sqlalchemy.sqlserver import (
44 if_sqlserver_disable_constraints_triggers,
45)
46from pendulum import DateTime as Pendulum
47import pyramid.httpexceptions as exc
48from sqlalchemy.orm import (
49 Mapped,
50 mapped_column,
51 relationship,
52 Session as SqlASession,
53)
54from sqlalchemy.sql.expression import and_, exists, join, literal, select
55from sqlalchemy.sql.schema import ForeignKey
56from sqlalchemy.sql.sqltypes import BigInteger
58from camcops_server.cc_modules.cc_client_api_core import (
59 BatchDetails,
60 fail_user_error,
61 UploadTableChanges,
62)
63from camcops_server.cc_modules.cc_constants import ERA_NOW
64from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition
65from camcops_server.cc_modules.cc_patient import Patient
66from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
67from camcops_server.cc_modules.cc_sqla_coltypes import (
68 EraColType,
69 isotzdatetime_to_utcdatetime,
70 PendulumDateTimeAsIsoTextColType,
71 TableNameColType,
72)
73from camcops_server.cc_modules.cc_sqlalchemy import Base
74from camcops_server.cc_modules.cc_task import (
75 all_task_tablenames,
76 tablename_to_task_class_dict,
77 Task,
78)
79from camcops_server.cc_modules.cc_user import User
81if TYPE_CHECKING:
82 from sqlalchemy.sql.elements import ColumnElement
84 from camcops_server.cc_modules.cc_request import CamcopsRequest
86log = BraceStyleAdapter(logging.getLogger(__name__))
89# =============================================================================
90# Helper functions
91# =============================================================================
94def task_factory_unfiltered(
95 dbsession: SqlASession, basetable: str, serverpk: int
96) -> Optional[Task]:
97 """
98 Load a task from the database and return it.
99 No permission filtering is performed. (Used by
100 :class:`camcops_server.cc_modules.cc_taskindex.TaskIndexEntry`.)
102 Args:
103 dbsession: a :class:`sqlalchemy.orm.session.Session`
104 basetable: name of the task's base table
105 serverpk: server PK of the task
107 Returns:
108 the task, or ``None`` if the PK doesn't exist
110 Raises:
111 :exc:`HTTPBadRequest` if the table doesn't exist
112 """
113 d = tablename_to_task_class_dict()
114 try:
115 cls = d[basetable] # may raise KeyError
116 except KeyError:
117 raise exc.HTTPBadRequest(f"No such task table: {basetable!r}")
118 # noinspection PyProtectedMember
119 q = dbsession.query(cls).filter(cls._pk == serverpk)
120 return q.first()
123# =============================================================================
124# PatientIdNumIndexEntry
125# =============================================================================
128class PatientIdNumIndexEntry(Base):
129 """
130 Represents a server index entry for a
131 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`.
133 - Only current ID numbers are indexed.
134 """
136 __tablename__ = "_idnum_index"
138 idnum_pk: Mapped[int] = mapped_column(
139 primary_key=True,
140 index=True,
141 comment="Server primary key of the PatientIdNum "
142 "(and of the PatientIdNumIndexEntry)",
143 )
144 indexed_at_utc: Mapped[datetime.datetime] = mapped_column(
145 comment="When this index entry was created",
146 )
148 # noinspection PyProtectedMember
149 patient_pk: Mapped[Optional[int]] = mapped_column(
150 ForeignKey(Patient._pk),
151 index=True,
152 comment="Server primary key of the Patient",
153 )
154 which_idnum: Mapped[int] = mapped_column(
155 ForeignKey(IdNumDefinition.which_idnum),
156 index=True,
157 comment="Which of the server's ID numbers is this?",
158 )
159 idnum_value: Mapped[Optional[int]] = mapped_column(
160 BigInteger, comment="The value of the ID number"
161 )
163 # Relationships:
164 patient = relationship(Patient)
166 def __repr__(self) -> str:
167 return simple_repr(
168 self, ["idnum_pk", "patient_pk", "which_idnum", "idnum_value"]
169 )
171 # -------------------------------------------------------------------------
172 # Create
173 # -------------------------------------------------------------------------
175 @classmethod
176 def make_from_idnum(cls, idnum: PatientIdNum) -> "PatientIdNumIndexEntry":
177 """
178 Returns an ID index entry for the specified
179 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`. The
180 returned index requires inserting into a database session.
181 """
182 # noinspection PyProtectedMember
183 assert idnum._current, "Only index current PatientIdNum objects"
184 index = cls()
185 index.idnum_pk = idnum.pk
186 index.patient_pk = idnum.get_patient_server_pk()
187 index.which_idnum = idnum.which_idnum
188 index.idnum_value = idnum.idnum_value
189 index.indexed_at_utc = Pendulum.now()
190 return index
192 @classmethod
193 def index_idnum(cls, idnum: PatientIdNum, session: SqlASession) -> None:
194 """
195 Indexes an ID number and inserts the index into the database.
197 Args:
198 idnum: a
199 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`
200 session:
201 an SQLAlchemy Session
202 """
203 index = cls.make_from_idnum(idnum)
204 session.add(index)
206 @classmethod
207 def unindex_patient(cls, patient: Patient, session: SqlASession) -> None:
208 """
209 Removes all ID number indexes from the database for a patient.
211 Args:
212 patient:
213 :class:`camcops_server.cc_modules.cc_patient.Patient`
214 session:
215 an SQLAlchemy Session
216 """
218 # noinspection PyUnresolvedReferences
219 idxtable = cls.__table__ # type: ignore[assignment] # type: Table
220 idxcols = idxtable.columns
221 # noinspection PyProtectedMember
222 session.execute(
223 idxtable.delete().where(idxcols.patient_pk == patient._pk) # type: ignore[attr-defined] # noqa: E501
224 )
226 # -------------------------------------------------------------------------
227 # Regenerate index
228 # -------------------------------------------------------------------------
230 @classmethod
231 def rebuild_idnum_index(
232 cls, session: SqlASession, indexed_at_utc: Pendulum
233 ) -> None:
234 """
235 Rebuilds the index entirely. Uses SQLAlchemy Core (not ORM) for speed.
237 Args:
238 session: an SQLAlchemy Session
239 indexed_at_utc: current time in UTC
240 """
241 log.info("Rebuilding patient ID number index")
243 # noinspection PyUnresolvedReferences
244 indextable = PatientIdNumIndexEntry.__table__ # type: ignore[assignment] # noqa: E501
245 indexcols = indextable.columns
246 # noinspection PyUnresolvedReferences
247 idnumtable = PatientIdNum.__table__ # type: ignore[assignment]
248 idnumcols = idnumtable.columns
249 # noinspection PyUnresolvedReferences
250 patienttable = Patient.__table__ # type: ignore[assignment]
251 patientcols = patienttable.columns
253 # Delete all entries
254 with if_sqlserver_disable_constraints_triggers(
255 session, indextable.name # type: ignore[attr-defined]
256 ):
257 session.execute(indextable.delete()) # type: ignore[attr-defined]
259 # Create new ones
260 indexed_at_utc = pendulum_to_datetime(indexed_at_utc)
261 # ... SQLite has trouble otherwise
262 # noinspection PyProtectedMember,PyPep8
263 session.execute(
264 indextable.insert().from_select( # type: ignore[attr-defined]
265 # Target:
266 [
267 indexcols.idnum_pk,
268 indexcols.indexed_at_utc,
269 indexcols.patient_pk,
270 indexcols.which_idnum,
271 indexcols.idnum_value,
272 ],
273 # Source:
274 (
275 select(
276 idnumcols._pk,
277 literal(indexed_at_utc),
278 patientcols._pk,
279 idnumcols.which_idnum,
280 idnumcols.idnum_value,
281 )
282 .select_from(
283 join(
284 idnumtable,
285 patienttable,
286 and_(
287 idnumcols._device_id == patientcols._device_id,
288 idnumcols._era == patientcols._era,
289 idnumcols.patient_id == patientcols.id,
290 ),
291 )
292 )
293 .where(idnumcols._current == True) # noqa: E712
294 .where(idnumcols.idnum_value.isnot(None))
295 .where(patientcols._current == True) # noqa: E712
296 ),
297 )
298 )
300 # -------------------------------------------------------------------------
301 # Check index
302 # -------------------------------------------------------------------------
303 @classmethod
304 def check_index(
305 cls, session: SqlASession, show_all_bad: bool = False
306 ) -> bool:
307 """
308 Checks the index.
310 Args:
311 session:
312 an SQLAlchemy Session
313 show_all_bad:
314 show all bad entries? (If false, return upon the first)
316 Returns:
317 bool: is the index OK?
318 """
319 ok = True
321 log.info(
322 "Checking all patient ID number indexes represent valid entries"
323 )
324 # noinspection PyUnresolvedReferences,PyProtectedMember
325 q_idx_without_original = session.query(PatientIdNumIndexEntry).filter(
326 ~exists()
327 .select_from(
328 PatientIdNum.__table__.join(
329 Patient.__table__,
330 Patient.id == PatientIdNum.patient_id,
331 Patient._device_id == PatientIdNum._device_id, # type: ignore[arg-type] # noqa: E501
332 Patient._era == PatientIdNum._era, # type: ignore[arg-type] # noqa: E501
333 )
334 )
335 .where(
336 and_(
337 PatientIdNum._pk == PatientIdNumIndexEntry.idnum_pk,
338 PatientIdNum._current == True, # noqa: E712
339 PatientIdNum.which_idnum
340 == PatientIdNumIndexEntry.which_idnum,
341 PatientIdNum.idnum_value
342 == PatientIdNumIndexEntry.idnum_value,
343 Patient._pk == PatientIdNumIndexEntry.patient_pk,
344 Patient._current == True, # noqa: E712
345 )
346 )
347 )
348 for index in q_idx_without_original:
349 log.error(
350 "Patient ID number index without matching " "original: {!r}",
351 index,
352 )
353 ok = False
354 if not show_all_bad:
355 return ok
357 log.info("Checking all patient ID numbers have an index")
358 # noinspection PyUnresolvedReferences,PyProtectedMember
359 q_original_with_idx = session.query(PatientIdNum).filter(
360 PatientIdNum._current == True, # noqa: E712
361 PatientIdNum.idnum_value.isnot(None),
362 ~exists()
363 .select_from(PatientIdNumIndexEntry.__table__)
364 .where(
365 and_(
366 PatientIdNum._pk == PatientIdNumIndexEntry.idnum_pk,
367 PatientIdNum.which_idnum
368 == PatientIdNumIndexEntry.which_idnum,
369 PatientIdNum.idnum_value
370 == PatientIdNumIndexEntry.idnum_value,
371 )
372 ),
373 )
374 for orig in q_original_with_idx:
375 log.error("ID number without index entry: {!r}", orig)
376 ok = False
377 if not show_all_bad:
378 return ok
380 return ok
382 # -------------------------------------------------------------------------
383 # Update index at the point of upload from a device
384 # -------------------------------------------------------------------------
386 @classmethod
387 def update_idnum_index_for_upload(
388 cls,
389 session: SqlASession,
390 indexed_at_utc: Pendulum,
391 tablechanges: UploadTableChanges,
392 ) -> None:
393 """
394 Updates the index for a device's upload.
396 - Deletes index entries for records that are on the way out.
397 - Creates index entries for records that are on the way in.
398 - Should be called after both the Patient and PatientIdNum tables are
399 committed; see special ordering in
400 :func:`camcops_server.cc_modules.client_api.commit_all`.
402 Args:
403 session:
404 an SQLAlchemy Session
405 indexed_at_utc:
406 current time in UTC
407 tablechanges:
408 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges`
409 object describing the changes to a table
410 """ # noqa
411 # noinspection PyUnresolvedReferences
412 indextable = PatientIdNumIndexEntry.__table__ # type: ignore[assignment] # noqa: E501
413 indexcols = indextable.columns
414 # noinspection PyUnresolvedReferences
415 idnumtable = PatientIdNum.__table__ # type: ignore[assignment]
416 idnumcols = idnumtable.columns
417 # noinspection PyUnresolvedReferences
418 patienttable = Patient.__table__ # type: ignore[assignment]
419 patientcols = patienttable.columns
421 # Delete the old
422 removal_pks = tablechanges.idnum_delete_index_pks
423 if removal_pks:
424 log.debug(
425 "Deleting old ID number indexes: server PKs {}", removal_pks
426 )
427 session.execute(
428 indextable.delete().where( # type: ignore[attr-defined]
429 indextable.c.idnum_pk.in_(removal_pks)
430 )
431 )
433 # Create the new
434 addition_pks = tablechanges.idnum_add_index_pks
435 if addition_pks:
436 select_fields: list[ColumnElement[Any]] = [
437 idnumcols._pk,
438 literal(indexed_at_utc),
439 patientcols._pk,
440 idnumcols.which_idnum,
441 idnumcols.idnum_value,
442 ]
444 log.debug("Adding ID number indexes: server PKs {}", addition_pks)
445 # noinspection PyPep8,PyProtectedMember
446 session.execute(
447 indextable.insert().from_select( # type: ignore[attr-defined]
448 # Target:
449 [
450 indexcols.idnum_pk,
451 indexcols.indexed_at_utc,
452 indexcols.patient_pk,
453 indexcols.which_idnum,
454 indexcols.idnum_value,
455 ],
456 # Source:
457 (
458 select(*select_fields)
459 .select_from(
460 join(
461 idnumtable,
462 patienttable,
463 and_(
464 idnumcols._device_id
465 == patientcols._device_id,
466 idnumcols._era == patientcols._era,
467 idnumcols.patient_id == patientcols.id,
468 ),
469 )
470 )
471 .where(idnumcols._pk.in_(addition_pks))
472 .where(patientcols._current == True) # noqa: E712
473 ),
474 )
475 )
478# =============================================================================
479# TaskIndexEntry
480# =============================================================================
483class TaskIndexEntry(Base):
484 """
485 Represents a server index entry for a
486 :class:`camcops_server.cc_modules.cc_task.Task`.
488 - Only current tasks are indexed. This simplifies direct linking to patient
489 PKs.
490 """
492 __tablename__ = "_task_index"
494 index_entry_pk: Mapped[int] = mapped_column(
495 primary_key=True,
496 autoincrement=True,
497 comment="Arbitrary primary key of this index entry",
498 )
499 indexed_at_utc: Mapped[datetime.datetime] = mapped_column(
500 comment="When this index entry was created",
501 )
503 # The next two fields link to our task:
504 task_table_name: Mapped[Optional[str]] = mapped_column(
505 TableNameColType,
506 index=True,
507 comment="Table name of the task's base table",
508 )
509 task_pk: Mapped[Optional[int]] = mapped_column(
510 index=True,
511 comment="Server primary key of the task",
512 )
513 # We can probably even represent this with an SQLAlchemy ORM relationship.
514 # This is polymorphic loading (we'll return objects of different types)
515 # based on concrete table inheritance (each type of object -- each task --
516 # has its own standalone table).
517 # However, there are warnings about the inefficiency of this; see
518 # https://docs.sqlalchemy.org/en/latest/orm/inheritance.html#concrete-table-inheritance
519 # and we are trying to be efficient. So let's do via task() below.
521 # This links to the task's patient, if there is one:
522 # noinspection PyProtectedMember
523 patient_pk: Mapped[Optional[int]] = mapped_column(
524 ForeignKey(Patient._pk),
525 index=True,
526 comment="Server primary key of the patient (if applicable)",
527 )
529 # These fields allow us to filter tasks efficiently:
530 device_id: Mapped[int] = mapped_column(
531 ForeignKey("_security_devices.id"),
532 index=True,
533 comment="ID of the source tablet device",
534 )
535 era: Mapped[str] = mapped_column(
536 EraColType,
537 index=True,
538 comment="Era (_era) field of the source record",
539 )
540 when_created_utc: Mapped[datetime.datetime] = mapped_column(
541 index=True,
542 comment="Date/time this task instance was created (UTC)",
543 )
544 when_created_iso: Mapped[Pendulum] = mapped_column(
545 PendulumDateTimeAsIsoTextColType,
546 index=True,
547 comment="Date/time this task instance was created (ISO 8601)",
548 ) # Pendulum on the Python side
549 when_added_batch_utc: Mapped[datetime.datetime] = mapped_column(
550 index=True,
551 comment="Date/time this task index was uploaded (UTC)",
552 )
553 adding_user_id: Mapped[Optional[int]] = mapped_column(
554 ForeignKey("_security_users.id"),
555 comment="ID of user that added this task",
556 )
557 group_id: Mapped[int] = mapped_column(
558 ForeignKey("_security_groups.id"),
559 index=True,
560 comment="ID of group to which this task belongs",
561 )
562 task_is_complete: Mapped[bool] = mapped_column(
563 comment="Is the task complete (as judged by the server when the index "
564 "entry was created)?",
565 )
567 # Relationships:
568 patient = relationship(Patient)
569 _adding_user = relationship(User)
571 def __repr__(self) -> str:
572 return simple_repr(
573 self,
574 [
575 "index_entry_pk",
576 "task_table_name",
577 "task_pk",
578 "patient_pk",
579 "device_id",
580 "era",
581 "when_created_utc",
582 "when_created_iso",
583 "when_added_batch_utc",
584 "adding_user_id",
585 "group_id",
586 "task_is_complete",
587 ],
588 )
590 # -------------------------------------------------------------------------
591 # Fetch the task
592 # -------------------------------------------------------------------------
594 @property
595 def task(self) -> Optional[Task]:
596 """
597 Returns:
598 the associated :class:`camcops_server.cc_modules.cc_task.Task`, or
599 ``None`` if none exists.
601 Raises:
602 :exc:`HTTPBadRequest` if the table doesn't exist
603 """
604 dbsession = SqlASession.object_session(self)
605 assert dbsession, (
606 "TaskIndexEntry.task called on a TaskIndexEntry "
607 "that's not yet in a database session"
608 )
609 return task_factory_unfiltered(
610 dbsession, self.task_table_name, self.task_pk
611 )
613 # -------------------------------------------------------------------------
614 # Other properties mirroring those of Task, for duck typing
615 # -------------------------------------------------------------------------
617 @property
618 def is_anonymous(self) -> bool:
619 """
620 Is the task anonymous?
621 """
622 return self.patient_pk is None
624 def is_complete(self) -> bool:
625 """
626 Is the task complete?
627 """
628 return self.task_is_complete
630 @property
631 def _current(self) -> bool:
632 """
633 All task index entries represent complete tasks, so this always returns
634 ``True``.
635 """
636 return True
638 @property
639 def pk(self) -> int:
640 """
641 Return's the task's server PK.
642 """
643 return self.task_pk
645 @property
646 def tablename(self) -> str:
647 """
648 Returns the base table name of the task.
649 """
650 return self.task_table_name
652 @property
653 def shortname(self) -> str:
654 """
655 Returns the task's shortname.
656 """
657 d = tablename_to_task_class_dict()
658 taskclass = d[self.task_table_name]
659 return taskclass.shortname
661 def is_live_on_tablet(self) -> bool:
662 """
663 Is the task live on the source device (e.g. tablet)?
664 """
665 return self.era == ERA_NOW
667 @property
668 def when_created(self) -> Pendulum:
669 """
670 Returns the creation date/time as a Pendulum DateTime object.
671 """
672 return self.when_created_iso
674 def any_patient_idnums_invalid(self, req: "CamcopsRequest") -> bool:
675 """
676 Do we have a patient who has any invalid ID numbers?
678 Args:
679 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
680 """
681 idnums = self.get_patient_idnum_objects()
682 for idnum in idnums:
683 if not idnum.is_fully_valid(req):
684 return True
685 return False
687 def get_patient_idnum_objects(self) -> List[PatientIdNum]:
688 """
689 Gets all :class:`PatientIdNum` objects for the patient.
690 """
691 return self.patient.get_idnum_objects() if self.patient else []
693 # -------------------------------------------------------------------------
694 # Create
695 # -------------------------------------------------------------------------
697 @classmethod
698 def make_from_task(
699 cls, task: Task, indexed_at_utc: Pendulum
700 ) -> "TaskIndexEntry":
701 """
702 Returns a task index entry for the specified
703 :class:`camcops_server.cc_modules.cc_task.Task`. The
704 returned index requires inserting into a database session.
706 Args:
707 task:
708 a :class:`camcops_server.cc_modules.cc_task.Task`
709 indexed_at_utc:
710 current time in UTC
711 """
712 assert indexed_at_utc is not None, "Missing indexed_at_utc"
714 index = cls()
716 index.indexed_at_utc = indexed_at_utc
718 index.task_table_name = task.tablename
719 index.task_pk = task.pk
721 patient = task.patient
722 index.patient_pk = patient.pk if patient else None
724 index.device_id = task.device_id
725 index.era = task.era
726 index.when_created_utc = task.get_creation_datetime_utc()
727 index.when_created_iso = task.when_created
728 # noinspection PyProtectedMember
729 index.when_added_batch_utc = task._when_added_batch_utc
730 index.adding_user_id = task.get_adding_user_id()
731 index.group_id = task.group_id
732 index.task_is_complete = task.is_complete()
734 return index
736 @classmethod
737 def index_task(
738 cls, task: Task, session: SqlASession, indexed_at_utc: Pendulum
739 ) -> None:
740 """
741 Indexes a task and inserts the index into the database.
743 Args:
744 task:
745 a :class:`camcops_server.cc_modules.cc_task.Task`
746 session:
747 an SQLAlchemy Session
748 indexed_at_utc:
749 current time in UTC
750 """
751 index = cls.make_from_task(task, indexed_at_utc=indexed_at_utc)
752 session.add(index)
754 @classmethod
755 def unindex_task(cls, task: Task, session: SqlASession) -> None:
756 """
757 Removes a task index from the database.
759 Args:
760 task:
761 a :class:`camcops_server.cc_modules.cc_task.Task`
762 session:
763 an SQLAlchemy Session
764 """
766 # noinspection PyUnresolvedReferences
767 idxtable = cls.__table__ # type: ignore[assignment] # type: Table
768 idxcols = idxtable.columns
769 tasktablename = task.__class__.tablename
770 session.execute(
771 idxtable.delete() # type: ignore[attr-defined]
772 .where(idxcols.task_table_name == tasktablename)
773 .where(idxcols.task_pk == task.pk)
774 )
776 # -------------------------------------------------------------------------
777 # Regenerate index
778 # -------------------------------------------------------------------------
780 @classmethod
781 def rebuild_index_for_task_type(
782 cls,
783 session: SqlASession,
784 taskclass: Type[Task],
785 indexed_at_utc: Pendulum,
786 delete_first: bool = True,
787 ) -> None:
788 """
789 Rebuilds the index for a particular task type.
791 Args:
792 session: an SQLAlchemy Session
793 taskclass: a subclass of
794 :class:`camcops_server.cc_modules.cc_task.Task`
795 indexed_at_utc: current time in UTC
796 delete_first: delete old index entries first? Should always be True
797 unless called as part of a master rebuild that deletes
798 everything first.
799 """
800 # noinspection PyUnresolvedReferences
801 idxtable = cls.__table__ # type: ignore[assignment] # type: Table
802 idxcols = idxtable.columns
803 tasktablename = taskclass.tablename
804 log.info("Rebuilding task index for {}", tasktablename)
805 # Delete all entries for this task
806 if delete_first:
807 session.execute(
808 idxtable.delete().where(idxcols.table_name == tasktablename) # type: ignore[attr-defined] # noqa: E501
809 )
810 # Create new entries
811 # noinspection PyPep8,PyUnresolvedReferences,PyProtectedMember
812 q = (
813 session.query(taskclass)
814 .filter(taskclass._current == True) # noqa: E712
815 .order_by(isotzdatetime_to_utcdatetime(taskclass.when_created))
816 )
817 for task in q:
818 cls.index_task(task, session, indexed_at_utc)
820 @classmethod
821 def rebuild_entire_task_index(
822 cls,
823 session: SqlASession,
824 indexed_at_utc: Pendulum,
825 skip_tasks_with_missing_tables: bool = False,
826 ) -> None:
827 """
828 Rebuilds the entire index.
830 Args:
831 session: an SQLAlchemy Session
832 indexed_at_utc: current time in UTC
833 skip_tasks_with_missing_tables: should we skip over tasks if their
834 tables are not in the database? (This is so we can rebuild an
835 index from a database upgrade, but not crash because newer
836 tasks haven't had their tables created yet.)
837 """
838 log.info("Rebuilding entire task index")
839 # noinspection PyUnresolvedReferences
840 idxtable = cls.__table__ # type: ignore[assignment] # type: Table
842 # Delete all entries
843 with if_sqlserver_disable_constraints_triggers(session, idxtable.name): # type: ignore[attr-defined] # noqa: E501
844 session.execute(idxtable.delete()) # type: ignore[attr-defined]
846 # Now rebuild:
847 for taskclass in Task.all_subclasses_by_tablename():
848 if skip_tasks_with_missing_tables:
849 basetable = taskclass.tablename
850 engine = get_engine_from_session(session)
851 if not table_exists(engine, basetable):
852 continue
853 cls.rebuild_index_for_task_type(
854 session, taskclass, indexed_at_utc, delete_first=False
855 )
857 # -------------------------------------------------------------------------
858 # Update index at the point of upload from a device
859 # -------------------------------------------------------------------------
861 @classmethod
862 def update_task_index_for_upload(
863 cls,
864 session: SqlASession,
865 tablechanges: UploadTableChanges,
866 indexed_at_utc: Pendulum,
867 ) -> None:
868 """
869 Updates the index for a device's upload.
871 - Deletes index entries for records that are on the way out.
872 - Creates index entries for records that are on the way in.
873 - Deletes/recreates index entries for records being preserved.
875 Args:
876 session:
877 an SQLAlchemy Session
878 tablechanges:
879 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges`
880 object describing the changes to a table
881 indexed_at_utc:
882 current time in UTC
883 """ # noqa
884 tasktablename = tablechanges.tablename
885 d = tablename_to_task_class_dict()
886 try:
887 taskclass = d[tasktablename] # may raise KeyError
888 except KeyError:
889 fail_user_error(f"Bug: no such task table: {tasktablename!r}")
891 # noinspection PyUnresolvedReferences
892 idxtable = cls.__table__ # type: ignore[assignment] # type: Table
893 idxcols = idxtable.columns
895 # Delete the old.
896 delete_index_pks = tablechanges.task_delete_index_pks
897 if delete_index_pks:
898 log.debug(
899 "Deleting old task indexes: {}, server PKs {}",
900 tasktablename,
901 delete_index_pks,
902 )
903 # noinspection PyProtectedMember
904 session.execute(
905 idxtable.delete() # type: ignore[attr-defined]
906 .where(idxcols.task_table_name == tasktablename)
907 .where(idxcols.task_pk.in_(delete_index_pks))
908 )
910 # Create the new.
911 reindex_pks = tablechanges.task_reindex_pks
912 if reindex_pks:
913 log.debug(
914 "Recreating task indexes: {}, server PKs {}",
915 tasktablename,
916 reindex_pks,
917 )
918 # noinspection PyUnboundLocalVariable,PyProtectedMember
919 q = session.query(taskclass).filter(taskclass._pk.in_(reindex_pks))
920 for task in q:
921 cls.index_task(task, session, indexed_at_utc=indexed_at_utc)
923 # -------------------------------------------------------------------------
924 # Check index
925 # -------------------------------------------------------------------------
926 @classmethod
927 def check_index(
928 cls, session: SqlASession, show_all_bad: bool = False
929 ) -> bool:
930 """
931 Checks the index.
933 Args:
934 session:
935 an SQLAlchemy Session
936 show_all_bad:
937 show all bad entries? (If false, return upon the first)
939 Returns:
940 bool: is the index OK?
941 """
942 ok = True
944 log.info("Checking all task indexes represent valid entries")
945 for taskclass in Task.all_subclasses_by_tablename():
946 tasktablename = taskclass.tablename
947 log.debug("Checking {}", tasktablename)
948 # noinspection PyUnresolvedReferences,PyProtectedMember
949 q_idx_without_original = session.query(TaskIndexEntry).filter(
950 TaskIndexEntry.task_table_name == tasktablename,
951 ~exists()
952 .select_from(taskclass.__table__)
953 .where(
954 and_(
955 TaskIndexEntry.task_pk == taskclass._pk,
956 taskclass._current == True, # noqa: E712
957 )
958 ),
959 )
960 # No check for a valid patient at this time.
961 for index in q_idx_without_original:
962 log.error("Task index without matching original: {!r}", index)
963 ok = False
964 if not show_all_bad:
965 return ok
967 log.info("Checking all tasks have an index")
968 for taskclass in Task.all_subclasses_by_tablename():
969 tasktablename = taskclass.tablename
970 log.debug("Checking {}", tasktablename)
971 # noinspection PyUnresolvedReferences,PyProtectedMember
972 q_original_with_idx = session.query(taskclass).filter(
973 taskclass._current == True, # noqa: E712
974 ~exists()
975 .select_from(TaskIndexEntry.__table__)
976 .where(
977 and_(
978 TaskIndexEntry.task_pk == taskclass._pk,
979 TaskIndexEntry.task_table_name == tasktablename,
980 )
981 ),
982 )
983 for orig in q_original_with_idx:
984 log.error("Task without index entry: {!r}", orig)
985 ok = False
986 if not show_all_bad:
987 return ok
989 return ok
992# =============================================================================
993# Wide-ranging index update functions
994# =============================================================================
997def reindex_everything(
998 session: SqlASession, skip_tasks_with_missing_tables: bool = False
999) -> None:
1000 """
1001 Deletes from and rebuilds all server index tables.
1003 Args:
1004 session: an SQLAlchemy Session
1005 skip_tasks_with_missing_tables: should we skip over tasks if their
1006 tables are not in the database? (This is so we can rebuild an index
1007 from a database upgrade, but not crash because newer tasks haven't
1008 had their tables created yet.)
1009 """
1010 now = Pendulum.utcnow()
1011 log.info("Reindexing database; indexed_at_utc = {}", now)
1012 PatientIdNumIndexEntry.rebuild_idnum_index(session, now)
1013 TaskIndexEntry.rebuild_entire_task_index(
1014 session,
1015 now,
1016 skip_tasks_with_missing_tables=skip_tasks_with_missing_tables,
1017 )
1020def update_indexes_and_push_exports(
1021 req: "CamcopsRequest",
1022 batchdetails: BatchDetails,
1023 tablechanges: UploadTableChanges,
1024) -> None:
1025 """
1026 Update server indexes, if required.
1028 Also triggers background jobs to export "new arrivals", if required.
1030 Args:
1031 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1032 batchdetails: the :class:`BatchDetails`
1033 tablechanges:
1034 a :class:`camcops_server.cc_modules.cc_client_api_core.UploadTableChanges`
1035 object describing the changes to a table
1036 """ # noqa
1037 tablename = tablechanges.tablename
1038 if tablename == PatientIdNum.__tablename__:
1039 # Update idnum index
1040 PatientIdNumIndexEntry.update_idnum_index_for_upload(
1041 session=req.dbsession,
1042 indexed_at_utc=batchdetails.batchtime,
1043 tablechanges=tablechanges,
1044 )
1045 elif tablename in all_task_tablenames():
1046 # Update task index
1047 TaskIndexEntry.update_task_index_for_upload(
1048 session=req.dbsession,
1049 tablechanges=tablechanges,
1050 indexed_at_utc=batchdetails.batchtime,
1051 )
1052 # Push exports
1053 recipients = req.all_push_recipients
1054 uploading_group_id = req.user.upload_group_id
1055 for recipient in recipients:
1056 recipient_name = recipient.recipient_name
1057 for pk in tablechanges.get_task_push_export_pks(
1058 recipient=recipient, uploading_group_id=uploading_group_id
1059 ):
1060 req.add_export_push_request(recipient_name, tablename, pk)
1061 # ... will be transmitted *after* the request performs COMMIT
1064def check_indexes(session: SqlASession, show_all_bad: bool = False) -> bool:
1065 """
1066 Checks all server index tables.
1068 Args:
1069 session:
1070 an SQLAlchemy Session
1071 show_all_bad:
1072 show all bad entries? (If false, return upon the first)
1074 Returns:
1075 bool: are the indexes OK?
1076 """
1077 p_ok = PatientIdNumIndexEntry.check_index(session, show_all_bad)
1078 if p_ok:
1079 log.info("Patient ID number index is good")
1080 else:
1081 log.error("Patient ID number index is bad")
1082 if not show_all_bad:
1083 return False
1084 t_ok = TaskIndexEntry.check_index(session, show_all_bad)
1085 if t_ok:
1086 log.info("Task index is good")
1087 else:
1088 log.error("Task index is bad")
1089 return p_ok and t_ok