Coverage for cc_modules/tests/cc_dump_tests.py: 99%

161 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-08 14:45 +0000

1""" 

2camcops_server/cc_modules/tests/cc_dump_tests.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26""" 

27 

28from typing import List, TYPE_CHECKING 

29 

30import pytest 

31from sqlalchemy import select 

32from sqlalchemy.sql.expression import table, text 

33from sqlalchemy.sql.schema import Column 

34from sqlalchemy.sql.sqltypes import String 

35 

36from camcops_server.cc_modules.cc_constants import EXTRA_TASK_TABLENAME_FIELD 

37from camcops_server.cc_modules.cc_db import ( 

38 SFN_CAMCOPS_SERVER_VERSION, 

39 SFN_IS_COMPLETE, 

40) 

41 

42from camcops_server.cc_modules.cc_dump import ( 

43 DumpController, 

44 copy_tasks_and_summaries, 

45) 

46from camcops_server.cc_modules.cc_db import FN_PK, FN_ADDITION_PENDING 

47from camcops_server.cc_modules.cc_patientidnum import extra_id_colname 

48from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions 

49from camcops_server.cc_modules.cc_summaryelement import ExtraSummaryTable 

50from camcops_server.cc_modules.cc_testfactories import ( 

51 NHSPatientIdNumFactory, 

52 PatientFactory, 

53) 

54from camcops_server.cc_modules.cc_unittest import DemoRequestTestCase 

55from camcops_server.tasks.tests.factories import ( 

56 BmiFactory, 

57 PhotoSequenceFactory, 

58) 

59 

60if TYPE_CHECKING: 

61 from sqlalchemy import Engine 

62 from sqlalchemy.orm import Session 

63 

64 

65@pytest.mark.usefixtures("setup_temp_session") 

66class DumpTestCase(DemoRequestTestCase): 

67 temp_engine: "Engine" 

68 temp_session: "Session" 

69 

70 

71class GetDestTableForSrcObjectTests(DumpTestCase): 

72 def test_copies_column_comments(self) -> None: 

73 patient = PatientFactory() 

74 src_table = patient.__table__ 

75 

76 options = TaskExportOptions() 

77 controller = DumpController( 

78 self.temp_engine, self.temp_session, options, self.req 

79 ) 

80 

81 dest_table = controller.get_dest_table_for_src_object(patient) 

82 

83 self.assertEqual(src_table.c.id.comment, dest_table.c.id.comment) 

84 

85 def test_copies_column_info_for_foreign_key(self) -> None: 

86 patient = PatientFactory() 

87 src_table = patient.__table__ 

88 

89 options = TaskExportOptions() 

90 controller = DumpController( 

91 self.temp_engine, self.temp_session, options, self.req 

92 ) 

93 

94 dest_table = controller.get_dest_table_for_src_object(patient) 

95 

96 self.assertFalse(dest_table.c._device_id.nullable) 

97 # TODO: if we think it is worth it. Currently will result in index 

98 # exists error if we try to set this on a removed foreign key. 

99 # self.assertTrue(dest_table.c._device_id.index) 

100 self.assertEqual( 

101 src_table.c._device_id.comment, dest_table.c._device_id.comment 

102 ) 

103 

104 def test_foreign_keys_are_empty_set(self) -> None: 

105 patient = PatientFactory() 

106 bmi = BmiFactory(patient=patient) 

107 

108 options = TaskExportOptions() 

109 controller = DumpController( 

110 self.temp_engine, self.temp_session, options, self.req 

111 ) 

112 

113 dest_table = controller.get_dest_table_for_src_object(bmi) 

114 

115 self.assertEqual(dest_table.c.patient_id.foreign_keys, set()) 

116 

117 def test_tablet_record_includes_summaries(self) -> None: 

118 patient = PatientFactory() 

119 bmi = BmiFactory(patient=patient) 

120 

121 options = TaskExportOptions(db_include_summaries=True) 

122 controller = DumpController( 

123 self.temp_engine, self.temp_session, options, self.req 

124 ) 

125 

126 dest_table = controller.get_dest_table_for_src_object(bmi) 

127 summary_names = [ 

128 SFN_IS_COMPLETE, 

129 SFN_CAMCOPS_SERVER_VERSION, 

130 ] # not exhaustive list 

131 

132 dest_names = [c.name for c in dest_table.c] 

133 self.assertLess(set(summary_names), set(dest_names)) 

134 

135 def test_has_extra_id_num_columns(self) -> None: 

136 patient = PatientFactory() 

137 idnum = NHSPatientIdNumFactory(patient=patient) 

138 

139 options = TaskExportOptions(db_patient_id_per_row=True) 

140 controller = DumpController( 

141 self.temp_engine, self.temp_session, options, self.req 

142 ) 

143 

144 dest_table = controller.get_dest_table_for_src_object(patient) 

145 dest_names = [c.name for c in dest_table.c] 

146 

147 self.assertIn(extra_id_colname(idnum.which_idnum), dest_names) 

148 

149 def test_task_descendant_has_extra_task_xref_columns(self) -> None: 

150 patient = PatientFactory() 

151 photo_sequence = PhotoSequenceFactory(patient=patient, photos=1) 

152 

153 options = TaskExportOptions(db_patient_id_per_row=True) 

154 controller = DumpController( 

155 self.temp_engine, self.temp_session, options, self.req 

156 ) 

157 

158 single_photo = photo_sequence.photos[0] 

159 

160 dest_table = controller.get_dest_table_for_src_object(single_photo) 

161 dest_names = [c.name for c in dest_table.c] 

162 

163 self.assertIn(EXTRA_TASK_TABLENAME_FIELD, dest_names) 

164 

165 

166class GetDestTableForEstTests(DumpTestCase): 

167 def test_copies_table_with_subset_of_columns(self) -> None: 

168 patient = PatientFactory() 

169 bmi = BmiFactory(patient=patient) 

170 

171 options = TaskExportOptions() 

172 controller = DumpController( 

173 self.temp_engine, self.temp_session, options, self.req 

174 ) 

175 

176 columns = [ 

177 Column("one", String), 

178 Column("two", String), 

179 Column("three", String), 

180 ] 

181 

182 est = ExtraSummaryTable( 

183 tablename="test_tablename", 

184 xmlname="test_xmlname", 

185 columns=columns, 

186 rows=[], 

187 task=bmi, 

188 ) 

189 dest_table = controller.get_dest_table_for_est(est) 

190 

191 src_names = [c.name for c in columns] 

192 dest_names = [c.name for c in dest_table.c] 

193 

194 self.assertEqual(set(dest_names), set(src_names)) 

195 

196 def test_appends_extra_id_columns(self) -> None: 

197 patient = PatientFactory() 

198 idnum = NHSPatientIdNumFactory(patient=patient) 

199 bmi = BmiFactory(patient=patient) 

200 

201 options = TaskExportOptions() 

202 controller = DumpController( 

203 self.temp_engine, self.temp_session, options, self.req 

204 ) 

205 

206 est = ExtraSummaryTable( 

207 tablename="test_tablename", 

208 xmlname="test_xmlname", 

209 columns=[], 

210 rows=[], 

211 task=bmi, 

212 ) 

213 dest_table = controller.get_dest_table_for_est( 

214 est, add_extra_id_cols=True 

215 ) 

216 

217 dest_names = [c.name for c in dest_table.c] 

218 

219 self.assertIn(extra_id_colname(idnum.which_idnum), dest_names) 

220 

221 def test_appends_extra_task_xref_columns(self) -> None: 

222 patient = PatientFactory() 

223 photo_sequence = PhotoSequenceFactory(patient=patient, photos=1) 

224 

225 options = TaskExportOptions() 

226 controller = DumpController( 

227 self.temp_engine, self.temp_session, options, self.req 

228 ) 

229 

230 est = ExtraSummaryTable( 

231 tablename="test_tablename", 

232 xmlname="test_xmlname", 

233 columns=[], 

234 rows=[], 

235 task=photo_sequence, 

236 ) 

237 dest_table = controller.get_dest_table_for_est( 

238 est, add_extra_id_cols=True 

239 ) 

240 

241 dest_names = [c.name for c in dest_table.c] 

242 

243 self.assertIn(EXTRA_TASK_TABLENAME_FIELD, dest_names) 

244 

245 

246class CopyTasksAndSummariesTests(DumpTestCase): 

247 def test_task_fields_copied(self) -> None: 

248 export_options = TaskExportOptions( 

249 include_blobs=False, 

250 db_patient_id_per_row=False, 

251 db_include_summaries=False, 

252 ) 

253 

254 patient = PatientFactory() 

255 bmi = BmiFactory(patient=patient) 

256 

257 copy_tasks_and_summaries( 

258 tasks=[bmi], 

259 dst_engine=self.temp_engine, 

260 dst_session=self.temp_session, 

261 export_options=export_options, 

262 req=self.req, 

263 ) 

264 self.temp_session.commit() 

265 

266 query = select(text("*")).select_from(table("bmi")) 

267 result = self.temp_session.execute(query) 

268 

269 row = next(result) 

270 

271 # Normal columns 

272 self.assertAlmostEqual(row.height_m, bmi.height_m) 

273 self.assertAlmostEqual(row.mass_kg, bmi.mass_kg) 

274 

275 # Should have been nulled 

276 for colname in [ 

277 "_addition_pending", 

278 "_forcibly_preserved", 

279 "_manually_erased", 

280 ]: # not exhaustive list 

281 self.assertIsNone(getattr(row, colname)) 

282 

283 # No summaries 

284 self.assertFalse(hasattr(row, SFN_IS_COMPLETE)) 

285 self.assertFalse(hasattr(row, SFN_CAMCOPS_SERVER_VERSION)) 

286 

287 def test_summary_fields_copied(self) -> None: 

288 export_options = TaskExportOptions( 

289 include_blobs=False, 

290 db_patient_id_per_row=False, 

291 db_include_summaries=True, 

292 ) 

293 

294 patient = PatientFactory() 

295 bmi = BmiFactory(patient=patient) 

296 

297 copy_tasks_and_summaries( 

298 tasks=[bmi], 

299 dst_engine=self.temp_engine, 

300 dst_session=self.temp_session, 

301 export_options=export_options, 

302 req=self.req, 

303 ) 

304 self.temp_session.commit() 

305 

306 query = select(text("*")).select_from(table("bmi")) 

307 result = self.temp_session.execute(query) 

308 

309 row = next(result) 

310 

311 self.assertTrue(hasattr(row, SFN_IS_COMPLETE)) 

312 self.assertTrue(hasattr(row, SFN_CAMCOPS_SERVER_VERSION)) 

313 

314 def test_has_extra_id_num_columns(self) -> None: 

315 export_options = TaskExportOptions( 

316 include_blobs=False, 

317 db_patient_id_per_row=True, 

318 db_include_summaries=False, 

319 ) 

320 

321 patient = PatientFactory() 

322 idnum = NHSPatientIdNumFactory(patient=patient) 

323 bmi = BmiFactory(patient=patient) 

324 

325 copy_tasks_and_summaries( 

326 tasks=[bmi], 

327 dst_engine=self.temp_engine, 

328 dst_session=self.temp_session, 

329 export_options=export_options, 

330 req=self.req, 

331 ) 

332 query = select(text("*")).select_from(table("bmi")) 

333 result = self.temp_session.execute(query) 

334 

335 row = next(result) 

336 

337 self.assertEqual( 

338 getattr(row, extra_id_colname(idnum.which_idnum)), 

339 idnum.idnum_value, 

340 ) 

341 

342 def test_has_extra_task_xref_columns(self) -> None: 

343 export_options = TaskExportOptions( 

344 include_blobs=False, 

345 db_patient_id_per_row=True, 

346 db_include_summaries=False, 

347 ) 

348 

349 patient = PatientFactory() 

350 photo_sequence = PhotoSequenceFactory(patient=patient, photos=1) 

351 

352 copy_tasks_and_summaries( 

353 tasks=[photo_sequence], 

354 dst_engine=self.temp_engine, 

355 dst_session=self.temp_session, 

356 export_options=export_options, 

357 req=self.req, 

358 ) 

359 query = select(text("*")).select_from(table("photosequence_photos")) 

360 result = self.temp_session.execute(query) 

361 

362 row = next(result) 

363 

364 self.assertEqual( 

365 getattr(row, EXTRA_TASK_TABLENAME_FIELD), "photosequence" 

366 ) 

367 

368 

369class GenAllDestColumnsTests(DumpTestCase): 

370 def test_omits_irrelevant_columns(self) -> None: 

371 options = TaskExportOptions( 

372 include_blobs=False, 

373 db_patient_id_per_row=False, 

374 db_make_all_tables_even_empty=True, 

375 db_include_summaries=False, 

376 ) 

377 

378 controller = DumpController( 

379 self.temp_engine, self.temp_session, options, self.req 

380 ) 

381 

382 table_column_names: dict[str, List[str]] = {} 

383 columns = controller.gen_all_dest_columns() 

384 

385 for column in columns: 

386 table_column_names.setdefault(column.table.name, []).append( 

387 column.name 

388 ) 

389 

390 self.assertIn("bmi", table_column_names) 

391 self.assertNotIn("blobs", table_column_names) 

392 self.assertIn(FN_PK, table_column_names["bmi"]) 

393 self.assertNotIn(FN_ADDITION_PENDING, table_column_names["bmi"])