Coverage for cc_modules/cc_dump.py: 23%

221 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/cc_modules/cc_dump.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Methods for providing a dump of data from the server to the web user.** 

27 

28""" 

29 

30import logging 

31from typing import ( 

32 Any, 

33 Dict, 

34 Generator, 

35 Iterable, 

36 List, 

37 Optional, 

38 Set, 

39 Tuple, 

40 Type, 

41 TYPE_CHECKING, 

42) 

43 

44from cardinal_pythonlib.logs import BraceStyleAdapter 

45from cardinal_pythonlib.sqlalchemy.orm_inspect import ( 

46 gen_columns, 

47 gen_orm_classes_from_base, 

48 walk_orm_tree, 

49) 

50from sqlalchemy import insert, Integer 

51from sqlalchemy.exc import CompileError 

52from sqlalchemy.engine.base import Engine 

53from sqlalchemy.orm import Session as SqlASession 

54from sqlalchemy.sql.schema import Column, MetaData, Table 

55 

56from camcops_server.cc_modules.cc_blob import Blob 

57from camcops_server.cc_modules.cc_db import ( 

58 GenericTabletRecordMixin, 

59 TaskDescendant, 

60) 

61from camcops_server.cc_modules.cc_device import Device 

62from camcops_server.cc_modules.cc_email import Email 

63from camcops_server.cc_modules.cc_exportmodels import ( 

64 ExportedTask, 

65 ExportedTaskEmail, 

66 ExportedTaskFileGroup, 

67 ExportedTaskHL7Message, 

68) 

69from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

70from camcops_server.cc_modules.cc_group import Group, group_group_table 

71from camcops_server.cc_modules.cc_membership import UserGroupMembership 

72from camcops_server.cc_modules.cc_patient import Patient 

73from camcops_server.cc_modules.cc_patientidnum import ( 

74 all_extra_id_columns, 

75 PatientIdNum, 

76) 

77from camcops_server.cc_modules.cc_sqla_coltypes import camcops_column 

78from camcops_server.cc_modules.cc_task import Task 

79from camcops_server.cc_modules.cc_user import User 

80 

81if TYPE_CHECKING: 

82 from camcops_server.cc_modules.cc_request import CamcopsRequest 

83 from camcops_server.cc_modules.cc_summaryelement import ExtraSummaryTable 

84 from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions 

85 

86log = BraceStyleAdapter(logging.getLogger(__name__)) 

87 

88 

89# ============================================================================= 

90# Constants 

91# ============================================================================= 

92 

93# Restrict specified tables to certain columns only: 

94DUMP_ONLY_COLNAMES = { # mapping of tablename : list_of_column_names 

95 Device.__tablename__: ["camcops_version", "friendly_name", "id", "name"], 

96 User.__tablename__: ["fullname", "id", "username"], 

97} 

98# Drop specific columns from certain tables: 

99# mapping of tablename : list_of_column_names 

100DUMP_DROP_COLNAMES: dict[str, list[str]] = {} 

101# List of columns to be skipped regardless of table: 

102DUMP_SKIP_COLNAMES = [ 

103 # We restrict to current records only, so many of these are irrelevant: 

104 "_addition_pending", 

105 "_forcibly_preserved", 

106 "_manually_erased", 

107 "_manually_erased_at", 

108 "_manually_erasing_user_id", 

109 "_move_off_tablet", 

110 "_removal_pending", 

111 "_removing_user_id", 

112 "_successor_pk", 

113 "_when_removed_batch_utc", 

114 "_when_removed_exact", 

115] 

116DUMP_SKIP_RELNAMES = [ 

117 # List of *relationship* names to ignore 

118 "_manually_erasing_user", 

119 "_removing_user", 

120] 

121# List of table names to be skipped at all times: 

122DUMP_SKIP_TABLES = [ 

123 # We don't have to list all admin tables here; we process the dump starting 

124 # with tasks, so only things that have ORM relationships to a task might 

125 # feature. (The Email/ExportedTask* set don't, so this is just caution in 

126 # case we add a relationship later!) 

127 Email.__tablename__, 

128 ExportedTask.__tablename__, 

129 ExportedTaskEmail.__tablename__, 

130 ExportedTaskFileGroup.__tablename__, 

131 ExportedTaskHL7Message.__tablename__, 

132 ExportRecipient.__tablename__, 

133 group_group_table.name, 

134 UserGroupMembership.__tablename__, 

135] 

136# Tables for which no relationships will be traversed: 

137DUMP_SKIP_ALL_RELS_FOR_TABLES = [Group.__tablename__] 

138FOREIGN_KEY_CONSTRAINTS_IN_DUMP = False 

139# ... the keys will be present, but should we try to enforce constraints? 

140 

141 

142# ============================================================================= 

143# Handy place to hold the controlling information 

144# ============================================================================= 

145 

146 

147class DumpController(object): 

148 """ 

149 A controller class that manages the copying (dumping) of information from 

150 our database to another SQLAlchemy :class:`Engine`/:class:`Session`. 

151 """ 

152 

153 def __init__( 

154 self, 

155 dst_engine: Engine, 

156 dst_session: SqlASession, 

157 export_options: "TaskExportOptions", 

158 req: "CamcopsRequest", 

159 ) -> None: 

160 """ 

161 Args: 

162 dst_engine: destination SQLAlchemy Engine 

163 dst_session: destination SQLAlchemy Session 

164 export_options: :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions` 

165 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

166 """ # noqa 

167 self.dst_engine = dst_engine 

168 self.dst_session = dst_session 

169 self.export_options = export_options 

170 self.req = req 

171 

172 # We start with blank metadata. 

173 self.dst_metadata = MetaData() 

174 # Tables we are inserting into the destination database: 

175 self.dst_tables = {} # type: Dict[str, Table] 

176 # ... note that creating a Table() for a given SQLAlchemy metadata is 

177 # permitted only once, so we add to self.dst_tables as soon 

178 # as we create that. 

179 # Tables we've created: 

180 self.tablenames_created = set() # type: Set[str] 

181 # Tables we've processed, though we may ignore them: 

182 self.tablenames_seen = set() # type: Set[str] 

183 # ORM objects we've visited: 

184 self.instances_seen = set() # type: Set[object] 

185 

186 if export_options.db_make_all_tables_even_empty: 

187 self._create_all_dest_tables() 

188 

189 def _create_all_dest_tables(self) -> None: 

190 """ 

191 Creates all tables in the destination database, even ones that may 

192 not be used. 

193 """ 

194 log.debug("Creating all destination tables...") 

195 for table in self.gen_all_dest_tables(): 

196 self._create_dest_table(table) 

197 log.debug("... all destination tables created.") 

198 

199 def gen_all_dest_tables(self) -> Generator[Table, None, None]: 

200 """ 

201 Generates all destination tables. 

202 """ 

203 tablenames_seen = set() # type: Set[str] 

204 for cls in gen_orm_classes_from_base( 

205 GenericTabletRecordMixin 

206 ): # type: Type[GenericTabletRecordMixin] 

207 instance = cls() 

208 for table in self.gen_all_dest_tables_for_obj(instance): 

209 if table.name in tablenames_seen: 

210 continue 

211 tablenames_seen.add(table.name) 

212 yield table 

213 

214 def gen_all_dest_tables_for_obj( 

215 self, src_obj: object 

216 ) -> Generator[Table, None, None]: 

217 """ 

218 Generates all destination tables for an object. 

219 """ 

220 # Main table 

221 yield self.get_dest_table_for_src_object(src_obj) 

222 # Additional tables 

223 if isinstance(src_obj, Task): 

224 add_extra_id_cols = ( 

225 self.export_options.db_patient_id_in_each_row 

226 and not src_obj.is_anonymous 

227 ) 

228 estables = src_obj.get_all_summary_tables(self.req) 

229 for est in estables: 

230 yield self.get_dest_table_for_est( 

231 est, add_extra_id_cols=add_extra_id_cols 

232 ) 

233 

234 def gen_all_dest_columns(self) -> Generator[Column, None, None]: 

235 """ 

236 Generates all destination columns. 

237 """ 

238 for table in self.gen_all_dest_tables(): 

239 if not self._dump_skip_table(table.name): 

240 for col in table.columns: 

241 if col.name not in DUMP_SKIP_COLNAMES: 

242 yield col 

243 

244 def consider_object(self, src_obj: object) -> None: 

245 """ 

246 Think about an SQLAlchemy ORM object. If it comes from a table we 

247 want dumped, add this object to the dump. 

248 """ 

249 # noinspection PyUnresolvedReferences 

250 src_table = src_obj.__table__ # type: ignore[attr-defined] 

251 src_tablename = src_table.name 

252 if src_tablename not in self.tablenames_seen: 

253 # If we encounter a table we've not seen, offer our "table decider" 

254 # the opportunity to add it to the metadata and create the table. 

255 self._add_dump_table_for_src_object(src_obj) 

256 # If this table is going into the destination, copy the object 

257 # (and maybe remove columns from it, or add columns to it). 

258 if src_tablename in self.dst_tables and not self._dump_skip_table( 

259 src_tablename 

260 ): 

261 self._copy_object_to_dump(src_obj) 

262 

263 @staticmethod 

264 def _merits_extra_id_num_columns( 

265 obj: object, 

266 ) -> Tuple[bool, Optional[Patient]]: 

267 """ 

268 Is the source object one that would support the addition of extra 

269 ID number information if the export option ``DB_PATIENT_ID_PER_ROW`` is 

270 set? If so, return the relevant patient. 

271 

272 Args: 

273 obj: an SQLAlchemy ORM object 

274 

275 Returns: 

276 tuple: ``(merits, patient)``, where ``merits`` is a ``bool`` (does 

277 it merit this?) and ``patient`` is a relevant 

278 :class:`camcops_server.cc_modules.cc_patient.Patient``, if found. 

279 It is also guaranteed that if a patient is returned, ``merits`` is 

280 ``True`` (but not guaranteed that if ``merits`` is true, that 

281 ``patient`` is not ``None``). 

282 

283 """ 

284 if not isinstance(obj, GenericTabletRecordMixin): 

285 # Must be data that originated from the client. 

286 return False, None 

287 if isinstance(obj, PatientIdNum): 

288 # PatientIdNum already has this info. 

289 return False, None 

290 if isinstance(obj, Patient): 

291 return True, obj 

292 if isinstance(obj, Task): 

293 if obj.is_anonymous: 

294 # Anonymous tasks don't. 

295 return False, None 

296 return True, obj.patient 

297 if isinstance(obj, TaskDescendant): 

298 merits = obj.task_ancestor_might_have_patient() 

299 patient = obj.task_ancestor_patient() 

300 return merits, patient 

301 log.warning( 

302 f"_merits_extra_id_num_columns_if_requested: don't know " 

303 f"how to handle {obj!r}" 

304 ) 

305 return False, None 

306 

307 def get_dest_table_for_src_object(self, src_obj: object) -> Table: 

308 """ 

309 Produces the destination table for the source object. 

310 

311 Args: 

312 src_obj: 

313 An SQLAlchemy ORM object. It will *not* be a 

314 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable`; 

315 those are handled instead by 

316 :meth:`_get_or_insert_summary_table`. 

317 

318 Returns: 

319 an SQLAlchemy :class:`Table` 

320 """ 

321 # noinspection PyUnresolvedReferences 

322 src_table = src_obj.__table__ # type: ignore[attr-defined] 

323 tablename = src_table.name 

324 

325 # Don't create it twice in the SQLAlchemy metadata. 

326 if tablename in self.dst_tables: 

327 return self.dst_tables[tablename] 

328 

329 dst_table = src_table.to_metadata(self.dst_metadata) 

330 

331 # Copy columns, dropping any we don't want, and dropping FK constraints 

332 changed_columns = [] # type: List[Column] 

333 

334 for dst_column in dst_table.columns: 

335 if dst_column.foreign_keys: 

336 changed_columns.append( 

337 # Trying to set index=dst_column.index here results in 

338 # index ... already exists error when the table is created. 

339 Column( 

340 dst_column.name, 

341 Integer, 

342 nullable=dst_column.nullable, 

343 comment=dst_column.comment, 

344 ) 

345 ) 

346 elif self._dump_skip_column(tablename, dst_column.name): 

347 changed_columns.append(Column(dst_column.name, Integer)) 

348 

349 # Add extra columns? 

350 if self.export_options.db_include_summaries: 

351 if isinstance(src_obj, GenericTabletRecordMixin): 

352 for summary_element in src_obj.get_summaries(self.req): 

353 changed_columns.append( 

354 camcops_column( 

355 summary_element.name, 

356 summary_element.coltype, 

357 exempt_from_anonymisation=True, 

358 comment=summary_element.decorated_comment, 

359 ) 

360 ) 

361 if self.export_options.db_patient_id_in_each_row: 

362 merits, _ = self._merits_extra_id_num_columns(src_obj) 

363 if merits: 

364 changed_columns.extend(all_extra_id_columns(self.req)) 

365 if isinstance(src_obj, TaskDescendant): 

366 changed_columns += src_obj.extra_task_xref_columns() 

367 

368 dst_table = Table( 

369 tablename, 

370 self.dst_metadata, 

371 *changed_columns, 

372 extend_existing=True, 

373 ) 

374 # ... that modifies the metadata, so: 

375 self.dst_tables[tablename] = dst_table 

376 return dst_table 

377 

378 def get_dest_table_for_est( 

379 self, est: "ExtraSummaryTable", add_extra_id_cols: bool = False 

380 ) -> Table: 

381 """ 

382 Add an additional summary table to the dump, if it's not there already. 

383 Return the table (from the destination database). 

384 

385 Args: 

386 est: 

387 a 

388 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable` 

389 add_extra_id_cols: 

390 Add extra ID columns, for the ``DB_PATIENT_ID_PER_ROW`` 

391 export option? 

392 """ 

393 tablename = est.tablename 

394 if tablename in self.dst_tables: 

395 return self.dst_tables[tablename] 

396 

397 columns = est.columns.copy() 

398 if add_extra_id_cols: 

399 columns.extend(all_extra_id_columns(self.req)) 

400 columns.extend(est.extra_task_xref_columns()) 

401 table = Table(tablename, self.dst_metadata, *columns) 

402 # ... that modifies the metadata, so: 

403 self.dst_tables[tablename] = table 

404 return table 

405 

406 def _add_dump_table_for_src_object(self, src_obj: object) -> None: 

407 """ 

408 - Mark the object's table as seen. 

409 

410 - If we want it, add it to the metadata and execute a CREATE TABLE 

411 command. 

412 

413 - We may translate the table en route. 

414 

415 Args: 

416 src_obj: 

417 An SQLAlchemy ORM object. It will *not* be a 

418 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable`; 

419 those are handled instead by 

420 :meth:`_get_or_insert_summary_table`. 

421 """ 

422 # noinspection PyUnresolvedReferences 

423 src_table = src_obj.__table__ # type: ignore[attr-defined] 

424 tablename = src_table.name 

425 self.tablenames_seen.add(tablename) 

426 

427 # Skip the table? 

428 if self._dump_skip_table(tablename): 

429 return 

430 

431 # Get the table definition 

432 dst_table = self.get_dest_table_for_src_object(src_obj) 

433 # Create it 

434 self._create_dest_table(dst_table) 

435 

436 def _create_dest_table(self, dst_table: Table) -> None: 

437 """ 

438 Creates a table in the destination database. 

439 """ 

440 tablename = dst_table.name 

441 if tablename in self.tablenames_created: 

442 return # don't create it twice 

443 # Create the table 

444 # log.debug("Adding table {!r} to dump output", tablename) 

445 # You have to use an engine, not a session, to create tables (or you 

446 # get "AttributeError: 'Session' object has no attribute 

447 # '_run_visitor'"). 

448 # However, you have to commit the session, or you get 

449 # "sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) 

450 # database is locked", since a session is also being used. 

451 self.dst_session.commit() 

452 dst_table.create(self.dst_engine) 

453 self.tablenames_created.add(tablename) 

454 

455 def _copy_object_to_dump(self, src_obj: object) -> None: 

456 """ 

457 Copy the SQLAlchemy ORM object to the dump. 

458 """ 

459 # noinspection PyUnresolvedReferences 

460 src_table = src_obj.__table__ # type: ignore[attr-defined] 

461 adding_extra_ids = False 

462 patient = None # type: Optional[Patient] 

463 if self.export_options.db_patient_id_in_each_row: 

464 adding_extra_ids, patient = self._merits_extra_id_num_columns( 

465 src_obj 

466 ) 

467 

468 # 1. Insert row for this object, potentially adding and removing 

469 # columns. 

470 tablename = src_table.name 

471 dst_table = self.dst_tables[tablename] 

472 assert dst_table.name == tablename 

473 row = {} # type: Dict[str, Any] 

474 # Copy columns, skipping any we don't want 

475 for attrname, column in gen_columns(src_obj): 

476 if self._dump_skip_column(tablename, column.name): 

477 continue 

478 row[column.name] = getattr(src_obj, attrname) 

479 # Any other columns to add for this table? 

480 if isinstance(src_obj, GenericTabletRecordMixin): 

481 if self.export_options.db_include_summaries: 

482 for summary_element in src_obj.get_summaries(self.req): 

483 row[summary_element.name] = summary_element.value 

484 if adding_extra_ids: 

485 if patient: 

486 patient.add_extra_idnum_info_to_row(row) 

487 if isinstance(src_obj, TaskDescendant): 

488 src_obj.add_extra_task_xref_info_to_row(row) 

489 try: 

490 self.dst_session.execute(insert(dst_table).values(row)) 

491 except CompileError: 

492 log.critical("\ndst_table:\n{}\nrow:\n{}", dst_table, row) 

493 raise 

494 

495 # 2. If required, add extra tables/rows that this task wants to 

496 # offer (usually tables whose rows don't have a 1:1 correspondence 

497 # to the task or its ancillary objects). 

498 if isinstance(src_obj, Task): 

499 estables = src_obj.get_all_summary_tables(self.req) 

500 # ... includes SNOMED 

501 for est in estables: 

502 dst_summary_table = self._get_or_insert_summary_table( 

503 est, add_extra_id_cols=adding_extra_ids 

504 ) 

505 for row in est.rows: 

506 if patient: 

507 patient.add_extra_idnum_info_to_row(row) 

508 if adding_extra_ids: 

509 est.add_extra_task_xref_info_to_row(row) 

510 try: 

511 self.dst_session.execute( 

512 insert(dst_summary_table).values(row) 

513 ) 

514 except CompileError: 

515 log.critical( 

516 "\ndst_summary_table:\n{}\nrow:\n{}", 

517 dst_table, 

518 row, 

519 ) 

520 raise 

521 

522 def _get_or_insert_summary_table( 

523 self, est: "ExtraSummaryTable", add_extra_id_cols: bool = False 

524 ) -> Table: 

525 """ 

526 Add an additional summary table to the dump, if it's not there already. 

527 Return the table (from the destination database). 

528 

529 Args: 

530 est: 

531 a 

532 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable` 

533 add_extra_id_cols: 

534 Add extra ID columns, for the ``DB_PATIENT_ID_PER_ROW`` 

535 export option? 

536 """ 

537 tablename = est.tablename 

538 if tablename not in self.tablenames_created: 

539 table = self.get_dest_table_for_est( 

540 est, add_extra_id_cols=add_extra_id_cols 

541 ) 

542 self._create_dest_table(table) 

543 return self.dst_tables[tablename] 

544 

545 def _dump_skip_table(self, tablename: str) -> bool: 

546 """ 

547 Should we skip this table (omit it from the dump)? 

548 """ 

549 if ( 

550 not self.export_options.include_blobs 

551 and tablename == Blob.__tablename__ 

552 ): 

553 return True 

554 if tablename in DUMP_SKIP_TABLES: 

555 return True 

556 return False 

557 

558 @staticmethod 

559 def _dump_skip_column(tablename: str, columnname: str) -> bool: 

560 """ 

561 Should we skip this column (omit it from the dump)? 

562 """ 

563 if columnname in DUMP_SKIP_COLNAMES: 

564 return True 

565 if ( 

566 tablename in DUMP_ONLY_COLNAMES 

567 and columnname not in DUMP_ONLY_COLNAMES[tablename] 

568 ): 

569 return True 

570 if ( 

571 tablename in DUMP_DROP_COLNAMES 

572 and columnname in DUMP_DROP_COLNAMES[tablename] 

573 ): 

574 return True 

575 return False 

576 

577 

578# ============================================================================= 

579# Copying stuff to a dump 

580# ============================================================================= 

581 

582 

583def copy_tasks_and_summaries( 

584 tasks: Iterable[Task], 

585 dst_engine: Engine, 

586 dst_session: SqlASession, 

587 export_options: "TaskExportOptions", 

588 req: "CamcopsRequest", 

589) -> None: 

590 """ 

591 Copy a set of tasks, and their associated related information (found by 

592 walking the SQLAlchemy ORM tree), to the dump. 

593 

594 Args: 

595 tasks: tasks to copy 

596 dst_engine: destination SQLAlchemy Engine 

597 dst_session: destination SQLAlchemy Session 

598 export_options: :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions` 

599 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

600 """ # noqa 

601 # How best to create the structure that's required? 

602 # 

603 # https://stackoverflow.com/questions/21770829/sqlalchemy-copy-schema-and-data-of-subquery-to-another-database # noqa 

604 # https://stackoverflow.com/questions/40155340/sqlalchemy-reflect-and-copy-only-subset-of-existing-schema # noqa 

605 # 

606 # - Should we attempt to copy the MetaData object? That seems extremely 

607 # laborious, since every ORM class is tied to it. Moreover, 

608 # MetaData.tables is an immutabledict, so we're not going to be editing 

609 # anything. Even if we cloned the MetaData, that's not going to give us 

610 # ORM classes to walk. 

611 # - Shall we operate at a lower level? That seems sensible. 

612 # - Given that... we don't need to translate the PKs at all, unlike 

613 # merge_db. 

614 # - Let's not create FK constraints explicitly. Most are not achievable 

615 # anyway (e.g. linking on device/era; omission of BLOBs). 

616 

617 controller = DumpController( 

618 dst_engine=dst_engine, 

619 dst_session=dst_session, 

620 export_options=export_options, 

621 req=req, 

622 ) 

623 

624 # We walk through all the objects. 

625 log.debug("Starting to copy tasks...") 

626 for startobj in tasks: 

627 log.debug("Processing task: {!r}", startobj) 

628 for src_obj in walk_orm_tree( 

629 startobj, 

630 seen=controller.instances_seen, 

631 skip_relationships_always=DUMP_SKIP_RELNAMES, 

632 skip_all_relationships_for_tablenames=DUMP_SKIP_ALL_RELS_FOR_TABLES, # noqa 

633 skip_all_objects_for_tablenames=DUMP_SKIP_TABLES, 

634 ): 

635 controller.consider_object(src_obj) 

636 log.debug("... finished copying tasks.")