Coverage for cc_modules/cc_dummy_database.py: 26%

222 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/cc_modules/cc_dummy_database.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Functions for dummy database creation for manual testing.** 

27 

28""" 

29 

30import logging 

31import random 

32from typing import Optional, TYPE_CHECKING 

33 

34from cardinal_pythonlib.datetimefunc import ( 

35 convert_datetime_to_utc, 

36 format_datetime, 

37) 

38from cardinal_pythonlib.logs import BraceStyleAdapter 

39from cardinal_pythonlib.nhs import generate_random_nhs_number 

40from faker import Faker 

41import pendulum 

42from sqlalchemy.exc import IntegrityError 

43from sqlalchemy.orm.session import sessionmaker 

44from sqlalchemy.sql.expression import func 

45from sqlalchemy.sql.schema import Column 

46from sqlalchemy.sql.sqltypes import ( 

47 Boolean, 

48 Date, 

49 Float, 

50 Integer, 

51 String, 

52 UnicodeText, 

53) 

54 

55from camcops_server.cc_modules.cc_constants import DateFormat 

56from camcops_server.cc_modules.cc_db import TASK_FREQUENT_AND_FK_FIELDS 

57from camcops_server.cc_modules.cc_device import Device 

58from camcops_server.cc_modules.cc_group import Group 

59from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition 

60from camcops_server.cc_modules.cc_patient import Patient 

61from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

62from camcops_server.cc_modules.cc_sqla_coltypes import ( 

63 COLATTR_PERMITTED_VALUE_CHECKER, 

64 PendulumDateTimeAsIsoTextColType, 

65) 

66 

67from camcops_server.cc_modules.cc_task import Task 

68from camcops_server.cc_modules.cc_user import User 

69from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION 

70 

71 

72if TYPE_CHECKING: 

73 from sqlalchemy.orm import Session as SqlASession 

74 from camcops_server.cc_modules.cc_config import CamcopsConfig 

75 from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin 

76 

77log = BraceStyleAdapter(logging.getLogger(__name__)) 

78 

79 

80# ============================================================================= 

81# DummyDataInserter 

82# ============================================================================= 

83 

84 

85class DummyDataInserter: 

86 """ 

87 Class to insert random data (within constraints) to tasks and other 

88 objects. It does not touch an actual database, so its methods can be used 

89 for free-floating items. 

90 """ 

91 

92 DEFAULT_MIN_FLOAT = 0 

93 DEFAULT_MAX_FLOAT = 1000 

94 

95 DEFAULT_MIN_INTEGER = 0 

96 DEFAULT_MAX_INTEGER = 1000 

97 

98 def __init__(self) -> None: 

99 self.faker = Faker("en_GB") 

100 

101 @staticmethod 

102 def column_is_q_field(column: Column) -> bool: 

103 if column.name.startswith("_"): 

104 return False 

105 

106 if column.name in TASK_FREQUENT_AND_FK_FIELDS: 

107 # It's that or TASK_FREQUENT_FIELDS. 

108 return False 

109 

110 return True 

111 

112 def fill_in_task_fields(self, task: Task) -> None: 

113 """ 

114 Inserts random data into a task (within any known constraints). 

115 """ 

116 # noinspection PyUnresolvedReferences 

117 for column in task.__table__.columns: 

118 if not self.column_is_q_field(column): # type: ignore[arg-type] 

119 continue 

120 

121 if isinstance(column.type, Integer): 

122 self.set_integer_field(task, column) # type: ignore[arg-type] 

123 continue 

124 

125 if isinstance(column.type, Float): 

126 self.set_float_field(task, column) # type: ignore[arg-type] 

127 continue 

128 

129 if isinstance(column.type, Boolean): 

130 self.set_bool_field(task, column) # type: ignore[arg-type] 

131 continue 

132 

133 if isinstance(column.type, Date): 

134 self.set_date_field(task, column) # type: ignore[arg-type] 

135 continue 

136 

137 if isinstance(column.type, PendulumDateTimeAsIsoTextColType): 

138 self.set_datetime_field(task, column) # type: ignore[arg-type] 

139 continue 

140 

141 if isinstance(column.type, UnicodeText): 

142 self.set_unicode_text_field(task, column) # type: ignore[arg-type] # noqa: E501 

143 continue 

144 

145 if isinstance(column.type, String): 

146 # covers String, Text, UnicodeText (but handled above) 

147 self.set_string_field(task, column) # type: ignore[arg-type] 

148 

149 def set_integer_field(self, task: Task, column: Column) -> None: 

150 setattr(task, column.name, self.get_valid_integer_for_field(column)) 

151 

152 def set_float_field(self, task: Task, column: Column) -> None: 

153 setattr(task, column.name, self.get_valid_float_for_field(column)) 

154 

155 def set_bool_field(self, task: Task, column: Column) -> None: 

156 setattr(task, column.name, self.faker.random.choice([False, True])) 

157 

158 def set_date_field(self, task: Task, column: Column) -> None: 

159 setattr(task, column.name, self.faker.date_object()) 

160 

161 def set_datetime_field(self, task: Task, column: Column) -> None: 

162 setattr(task, column.name, self.faker.date_time()) 

163 

164 def set_unicode_text_field(self, task: Task, column: Column) -> None: 

165 setattr(task, column.name, self.faker.text()) 

166 

167 def set_string_field(self, task: Task, column: Column) -> None: 

168 setattr(task, column.name, self.get_valid_string_for_field(column)) 

169 

170 def get_valid_integer_for_field(self, column: Column) -> int: 

171 min_value = self.DEFAULT_MIN_INTEGER 

172 max_value = self.DEFAULT_MAX_INTEGER 

173 

174 value_checker = column.info.get(COLATTR_PERMITTED_VALUE_CHECKER) 

175 

176 if value_checker is not None: 

177 if value_checker.permitted_values is not None: 

178 return self.faker.random.choice(value_checker.permitted_values) 

179 

180 if value_checker.minimum is not None: 

181 min_value = value_checker.minimum 

182 

183 if value_checker.maximum is not None: 

184 max_value = value_checker.maximum 

185 

186 return self.faker.random.randint(min_value, max_value) 

187 

188 def get_valid_float_for_field(self, column: Column) -> float: 

189 min_value = self.DEFAULT_MIN_FLOAT 

190 max_value = self.DEFAULT_MAX_FLOAT 

191 

192 value_checker = column.info.get(COLATTR_PERMITTED_VALUE_CHECKER) 

193 

194 if value_checker is not None: 

195 if value_checker.permitted_values is not None: 

196 return self.faker.random.choice(value_checker.permitted_values) 

197 

198 if value_checker.minimum is not None: 

199 min_value = value_checker.minimum 

200 

201 if value_checker.maximum is not None: 

202 max_value = value_checker.maximum 

203 

204 return self.faker.random.uniform(min_value, max_value) 

205 

206 def get_valid_string_for_field(self, column: Column) -> str: 

207 value_checker = column.info.get(COLATTR_PERMITTED_VALUE_CHECKER) 

208 

209 if value_checker is not None: 

210 if value_checker.permitted_values is not None: 

211 return self.faker.random.choice(value_checker.permitted_values) 

212 text = self.faker.text() 

213 

214 column_type = column.type 

215 

216 assert isinstance(column_type, String) 

217 

218 if column_type.length is None: 

219 return text 

220 

221 return text[: column_type.length] 

222 

223 

224# ============================================================================= 

225# DummyDataFactory 

226# ============================================================================= 

227 

228 

229class DummyDataFactory(DummyDataInserter): 

230 """ 

231 Factory to insert random data (within constraints) to tasks and other 

232 objects in a dummy database. Unlike its parent, this concerns itself with 

233 an actual data. 

234 """ 

235 

236 FIRST_PATIENT_ID = 10001 

237 NUM_PATIENTS = 5 

238 

239 def __init__(self, cfg: "CamcopsConfig") -> None: 

240 super().__init__() 

241 engine = cfg.get_sqla_engine() 

242 self.dbsession = sessionmaker()(bind=engine) # type: SqlASession 

243 

244 self.era_time = pendulum.now() 

245 self.era_time_utc = convert_datetime_to_utc(self.era_time) 

246 self.era = format_datetime(self.era_time, DateFormat.ISO8601) 

247 

248 self.group = None # type: Optional[Group] 

249 self.user = None # type: Optional[User] 

250 self.device = None # type: Optional[Device] 

251 self.nhs_iddef = None # type: Optional[IdNumDefinition] 

252 

253 def add_data(self) -> None: 

254 # noinspection PyTypeChecker 

255 next_id = self.next_id(Group.id) # type: ignore[arg-type] 

256 

257 self.group = Group() 

258 self.group.name = f"dummygroup{next_id}" 

259 self.group.description = "Dummy group" 

260 self.group.upload_policy = "sex AND anyidnum" 

261 self.group.finalize_policy = "sex AND idnum1001" 

262 self.dbsession.add(self.group) 

263 self.dbsession.commit() # sets PK fields 

264 

265 self.user = User.get_system_user(self.dbsession) 

266 self.user.upload_group_id = self.group.id 

267 

268 self.device = self.get_device(self.dbsession) 

269 self.dbsession.commit() 

270 

271 self.nhs_iddef = IdNumDefinition( 

272 which_idnum=1001, 

273 description="NHS number (TEST)", 

274 short_description="NHS#", 

275 hl7_assigning_authority="NHS", 

276 hl7_id_type="NHSN", 

277 ) 

278 self.dbsession.add(self.nhs_iddef) 

279 try: 

280 self.dbsession.commit() 

281 except IntegrityError: 

282 self.dbsession.rollback() 

283 

284 for patient_id in range( 

285 self.FIRST_PATIENT_ID, self.FIRST_PATIENT_ID + self.NUM_PATIENTS 

286 ): 

287 Faker.seed(patient_id) 

288 self.add_patient(patient_id) 

289 log.info(f"Adding tasks for patient {patient_id}") 

290 

291 Faker.seed() 

292 self.add_tasks(patient_id) 

293 

294 # noinspection PyMethodMayBeStatic 

295 def get_device(self, dbsession: "SqlASession") -> "Device": 

296 dummy_device_name = "dummy_device" 

297 

298 device = Device.get_device_by_name(dbsession, dummy_device_name) 

299 if device is None: 

300 device = Device() 

301 device.name = dummy_device_name 

302 device.friendly_name = "Dummy tablet device" 

303 device.registered_by_user = User.get_system_user(dbsession) 

304 device.when_registered_utc = pendulum.DateTime.utcnow() 

305 device.camcops_version = CAMCOPS_SERVER_VERSION 

306 dbsession.add(device) 

307 dbsession.flush() # So that we can use the PK elsewhere 

308 return device 

309 

310 def add_patient(self, patient_id: int) -> Patient: 

311 log.info(f"Adding patient {patient_id}") 

312 

313 patient = Patient() 

314 

315 patient.id = patient_id 

316 self.apply_standard_db_fields(patient) 

317 

318 patient.sex = self.faker.random.choices( 

319 ["M", "F", "X"], weights=[49.8, 49.8, 0.4] 

320 )[0] 

321 

322 if patient.sex == "M": 

323 patient.forename = self.faker.first_name_male() 

324 elif patient.sex == "F": 

325 patient.forename = self.faker.first_name_female() 

326 else: 

327 patient.forename = self.faker.first_name()[:1] 

328 

329 patient.surname = self.faker.last_name() 

330 

331 # Faker date_of_birth calculates from the current time so gives 

332 # different results on different days. By fixing the dates we get 

333 # consistent results but our population ages over time. 

334 patient.dob = self.faker.date_between_dates( 

335 date_start=pendulum.date(1900, 1, 1), 

336 date_end=pendulum.date(2020, 1, 1), 

337 ) 

338 self.dbsession.add(patient) 

339 

340 self.add_patient_idnum(patient_id) 

341 self.dbsession.commit() 

342 

343 return patient 

344 

345 # noinspection PyTypeChecker 

346 def add_patient_idnum(self, patient_id: int) -> None: 

347 next_id = self.next_id(PatientIdNum.id) # type: ignore[arg-type] 

348 

349 patient_idnum = PatientIdNum() 

350 patient_idnum.id = next_id 

351 self.apply_standard_db_fields(patient_idnum) 

352 patient_idnum.patient_id = patient_id 

353 patient_idnum.which_idnum = self.nhs_iddef.which_idnum 

354 

355 # Always create the same NHS number for each patient. 

356 # Uses a different random object to faker. 

357 # Restores the master RNG state afterwards. 

358 old_random_state = random.getstate() 

359 random.seed(patient_id) 

360 patient_idnum.idnum_value = generate_random_nhs_number() 

361 random.setstate(old_random_state) 

362 

363 self.dbsession.add(patient_idnum) 

364 

365 def add_tasks(self, patient_id: int) -> None: 

366 for cls in Task.all_subclasses_by_tablename(): 

367 task = cls() 

368 task.id = self.next_id(cls.id) 

369 self.apply_standard_task_fields(task) 

370 if task.has_patient: 

371 task.patient_id = patient_id 

372 

373 self.fill_in_task_fields(task) 

374 

375 self.dbsession.add(task) 

376 self.dbsession.commit() 

377 

378 def next_id(self, column: Column) -> int: 

379 max_id = self.dbsession.query(func.max(column)).scalar() 

380 if max_id is None: 

381 return 1 

382 

383 return max_id + 1 

384 

385 def apply_standard_task_fields(self, task: Task) -> None: 

386 """ 

387 Writes some default values to an SQLAlchemy ORM object representing 

388 a task. 

389 """ 

390 self.apply_standard_db_fields(task) 

391 task.when_created = self.era_time 

392 

393 def apply_standard_db_fields( 

394 self, obj: "GenericTabletRecordMixin" 

395 ) -> None: 

396 """ 

397 Writes some default values to an SQLAlchemy ORM object representing a 

398 record uploaded from a client (tablet) device. 

399 """ 

400 obj._device_id = self.device.id 

401 obj._era = self.era 

402 obj._group_id = self.group.id 

403 obj._current = True 

404 obj._adding_user_id = self.user.id 

405 obj._when_added_batch_utc = self.era_time_utc