Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/cc_modules/cc_dummy_database.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27**Functions for dummy database creation for manual testing.** 

28 

29""" 

30 

31import logging 

32import random 

33from typing import Optional, TYPE_CHECKING 

34 

35from cardinal_pythonlib.datetimefunc import ( 

36 convert_datetime_to_utc, 

37 format_datetime, 

38) 

39from cardinal_pythonlib.logs import BraceStyleAdapter 

40from cardinal_pythonlib.nhs import generate_random_nhs_number 

41from faker import Faker 

42import pendulum 

43from sqlalchemy.exc import IntegrityError 

44from sqlalchemy.orm.session import sessionmaker 

45from sqlalchemy.sql.expression import func 

46from sqlalchemy.sql.schema import Column 

47from sqlalchemy.sql.sqltypes import Boolean, Date, Float, Integer, UnicodeText 

48 

49from camcops_server.cc_modules.cc_constants import DateFormat 

50from camcops_server.cc_modules.cc_device import Device 

51from camcops_server.cc_modules.cc_group import Group 

52from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition 

53from camcops_server.cc_modules.cc_patient import Patient 

54from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

55from camcops_server.cc_modules.cc_sqla_coltypes import ( 

56 PendulumDateTimeAsIsoTextColType, 

57) 

58 

59from camcops_server.cc_modules.cc_task import Task 

60from camcops_server.cc_modules.cc_user import User 

61from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION 

62 

63 

64if TYPE_CHECKING: 

65 from sqlalchemy.orm import Session as SqlASession 

66 from camcops_server.cc_modules.cc_config import CamcopsConfig 

67 from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin 

68 

69log = BraceStyleAdapter(logging.getLogger(__name__)) 

70 

71 

72class DummyDataFactory(object): 

73 FIRST_PATIENT_ID = 10001 

74 NUM_PATIENTS = 5 

75 

76 DEFAULT_MIN_FLOAT = 0 

77 DEFAULT_MAX_FLOAT = 1000 

78 

79 DEFAULT_MIN_INTEGER = 0 

80 DEFAULT_MAX_INTEGER = 1000 

81 

82 def __init__(self, cfg: "CamcopsConfig") -> None: 

83 engine = cfg.get_sqla_engine() 

84 self.dbsession = sessionmaker()(bind=engine) # type: SqlASession 

85 

86 self.faker = Faker('en_GB') 

87 

88 self.era_time = pendulum.now() 

89 self.era_time_utc = convert_datetime_to_utc(self.era_time) 

90 self.era = format_datetime(self.era_time, DateFormat.ISO8601) 

91 

92 self.group = None # type: Optional[Group] 

93 self.user = None # type: Optional[User] 

94 self.device = None # type: Optional[Device] 

95 self.nhs_iddef = None # type: Optional[IdNumDefinition] 

96 

97 def add_data(self) -> None: 

98 # noinspection PyTypeChecker 

99 next_id = self.next_id(Group.id) 

100 

101 self.group = Group() 

102 self.group.name = f"dummygroup {next_id}" 

103 self.group.description = "Dummy group" 

104 self.group.upload_policy = "sex AND anyidnum" 

105 self.group.finalize_policy = "sex AND idnum1001" 

106 self.dbsession.add(self.group) 

107 self.dbsession.commit() # sets PK fields 

108 

109 self.user = User.get_system_user(self.dbsession) 

110 self.user.upload_group_id = self.group.id 

111 

112 self.device = self.get_device(self.dbsession) 

113 self.dbsession.commit() 

114 

115 self.nhs_iddef = IdNumDefinition(which_idnum=1001, 

116 description="NHS number (TEST)", 

117 short_description="NHS#", 

118 hl7_assigning_authority="NHS", 

119 hl7_id_type="NHSN") 

120 self.dbsession.add(self.nhs_iddef) 

121 try: 

122 self.dbsession.commit() 

123 except IntegrityError: 

124 self.dbsession.rollback() 

125 

126 for patient_id in range(self.FIRST_PATIENT_ID, 

127 self.FIRST_PATIENT_ID + self.NUM_PATIENTS): 

128 Faker.seed(patient_id) 

129 self.add_patient(patient_id) 

130 log.info(f"Adding tasks for patient {patient_id}") 

131 

132 Faker.seed() 

133 self.add_tasks(patient_id) 

134 

135 # noinspection PyMethodMayBeStatic 

136 def get_device(self, dbsession: "SqlASession") -> "Device": 

137 dummy_device_name = "dummy_device" 

138 

139 device = Device.get_device_by_name(dbsession, dummy_device_name) 

140 if device is None: 

141 device = Device() 

142 device.name = dummy_device_name 

143 device.friendly_name = "Dummy tablet device" 

144 device.registered_by_user = User.get_system_user(dbsession) 

145 device.when_registered_utc = pendulum.DateTime.utcnow() 

146 device.camcops_version = CAMCOPS_SERVER_VERSION 

147 dbsession.add(device) 

148 dbsession.flush() # So that we can use the PK elsewhere 

149 return device 

150 

151 def add_patient(self, patient_id: int) -> Patient: 

152 log.info(f"Adding patient {patient_id}") 

153 

154 patient = Patient() 

155 

156 patient.id = patient_id 

157 self.apply_standard_db_fields(patient) 

158 

159 patient.sex = self.faker.random.choices( 

160 ["M", "F", "X"], 

161 weights=[49.8, 49.8, 0.4] 

162 )[0] 

163 

164 if patient.sex == "M": 

165 patient.forename = self.faker.first_name_male() 

166 elif patient.sex == "F": 

167 patient.forename = self.faker.first_name_female() 

168 else: 

169 patient.forename = self.faker.first_name()[:1] 

170 

171 patient.surname = self.faker.last_name() 

172 

173 # Faker date_of_birth calculates from the current time so gives 

174 # different results on different days. By fixing the dates we get 

175 # consistent results but our population ages over time. 

176 patient.dob = self.faker.date_between_dates( 

177 date_start=pendulum.date(1900, 1, 1), 

178 date_end=pendulum.date(2020, 1, 1) 

179 ) 

180 self.dbsession.add(patient) 

181 

182 self.add_patient_idnum(patient_id) 

183 self.dbsession.commit() 

184 

185 return patient 

186 

187 # noinspection PyTypeChecker 

188 def add_patient_idnum(self, patient_id: int) -> None: 

189 next_id = self.next_id(PatientIdNum.id) 

190 

191 patient_idnum = PatientIdNum() 

192 patient_idnum.id = next_id 

193 self.apply_standard_db_fields(patient_idnum) 

194 patient_idnum.patient_id = patient_id 

195 patient_idnum.which_idnum = self.nhs_iddef.which_idnum 

196 

197 # Always create the same NHS number for each patient. 

198 # Uses a different random object to faker. 

199 # Restores the master RNG state afterwards. 

200 old_random_state = random.getstate() 

201 random.seed(patient_id) 

202 patient_idnum.idnum_value = generate_random_nhs_number() 

203 random.setstate(old_random_state) 

204 

205 self.dbsession.add(patient_idnum) 

206 

207 def add_tasks(self, patient_id: int): 

208 for cls in Task.all_subclasses_by_tablename(): 

209 task = cls() 

210 task.id = self.next_id(cls.id) 

211 self.apply_standard_task_fields(task) 

212 if task.has_patient: 

213 task.patient_id = patient_id 

214 

215 self.fill_in_task_fields(task) 

216 

217 self.dbsession.add(task) 

218 self.dbsession.commit() 

219 

220 def fill_in_task_fields(self, task: Task) -> None: 

221 # noinspection PyUnresolvedReferences 

222 for column in task.__table__.columns: 

223 if not self.column_is_q_field(column): 

224 continue 

225 

226 if isinstance(column.type, Integer): 

227 self.set_integer_field(task, column) 

228 continue 

229 

230 if isinstance(column.type, Float): 

231 self.set_float_field(task, column) 

232 continue 

233 

234 if isinstance(column.type, Boolean): 

235 self.set_bool_field(task, column) 

236 continue 

237 

238 if isinstance(column.type, Date): 

239 self.set_date_field(task, column) 

240 continue 

241 

242 if isinstance(column.type, PendulumDateTimeAsIsoTextColType): 

243 self.set_datetime_field(task, column) 

244 continue 

245 

246 if isinstance(column.type, UnicodeText): 

247 self.set_unicode_text_field(task, column) 

248 

249 def set_integer_field(self, task: Task, column: Column) -> None: 

250 setattr(task, column.name, self.get_valid_integer_for_field(column)) 

251 

252 def set_float_field(self, task: Task, column: Column) -> None: 

253 setattr(task, column.name, self.get_valid_float_for_field(column)) 

254 

255 def set_bool_field(self, task: Task, column: Column) -> None: 

256 setattr(task, column.name, self.faker.random.choice([False, True])) 

257 

258 def set_date_field(self, task: Task, column: Column) -> None: 

259 setattr(task, column.name, self.faker.date_object()) 

260 

261 def set_datetime_field(self, task: Task, column: Column) -> None: 

262 setattr(task, column.name, self.faker.date_time()) 

263 

264 def set_unicode_text_field(self, task: Task, column: Column) -> None: 

265 setattr(task, column.name, self.faker.text()) 

266 

267 def get_valid_integer_for_field(self, column: Column) -> int: 

268 min_value = self.DEFAULT_MIN_INTEGER 

269 max_value = self.DEFAULT_MAX_INTEGER 

270 

271 value_checker = getattr(column, "permitted_value_checker", None) 

272 

273 if value_checker is not None: 

274 if value_checker.permitted_values is not None: 

275 return self.faker.random.choice(value_checker.permitted_values) 

276 

277 if value_checker.minimum is not None: 

278 min_value = value_checker.minimum 

279 

280 if value_checker.maximum is not None: 

281 max_value = value_checker.maximum 

282 

283 return self.faker.random.randint(min_value, max_value) 

284 

285 def get_valid_float_for_field(self, column: Column) -> float: 

286 min_value = self.DEFAULT_MIN_FLOAT 

287 max_value = self.DEFAULT_MAX_FLOAT 

288 

289 value_checker = getattr(column, "permitted_value_checker", None) 

290 

291 if value_checker is not None: 

292 if value_checker.permitted_values is not None: 

293 return self.faker.random.choice(value_checker.permitted_values) 

294 

295 if value_checker.minimum is not None: 

296 min_value = value_checker.minimum 

297 

298 if value_checker.maximum is not None: 

299 max_value = value_checker.maximum 

300 

301 return self.faker.random.uniform(min_value, max_value) 

302 

303 @staticmethod 

304 def column_is_q_field(column: Column) -> bool: 

305 if column.name.startswith("_"): 

306 return False 

307 

308 if column.name in [ 

309 'editing_time_s', 

310 'firstexit_is_abort', 

311 'firstexit_is_finish', 

312 'id', 

313 'patient_id', 

314 'when_created', 

315 'when_firstexit', 

316 'when_last_modified', 

317 ]: 

318 return False 

319 

320 return True 

321 

322 def next_id(self, column: Column) -> int: 

323 max_id = self.dbsession.query(func.max(column)).scalar() 

324 if max_id is None: 

325 return 1 

326 

327 return max_id + 1 

328 

329 def apply_standard_task_fields(self, task: Task) -> None: 

330 """ 

331 Writes some default values to an SQLAlchemy ORM object representing 

332 a task. 

333 """ 

334 self.apply_standard_db_fields(task) 

335 task.when_created = self.era_time 

336 

337 def apply_standard_db_fields(self, 

338 obj: "GenericTabletRecordMixin") -> None: 

339 """ 

340 Writes some default values to an SQLAlchemy ORM object representing a 

341 record uploaded from a client (tablet) device. 

342 """ 

343 obj._device_id = self.device.id 

344 obj._era = self.era 

345 obj._group_id = self.group.id 

346 obj._current = True 

347 obj._adding_user_id = self.user.id 

348 obj._when_added_batch_utc = self.era_time_utc