Coverage for cc_modules/cc_dummy_database.py : 26%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/cc_modules/cc_dummy_database.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27**Functions for dummy database creation for manual testing.**
29"""
31import logging
32import random
33from typing import Optional, TYPE_CHECKING
35from cardinal_pythonlib.datetimefunc import (
36 convert_datetime_to_utc,
37 format_datetime,
38)
39from cardinal_pythonlib.logs import BraceStyleAdapter
40from cardinal_pythonlib.nhs import generate_random_nhs_number
41from faker import Faker
42import pendulum
43from sqlalchemy.exc import IntegrityError
44from sqlalchemy.orm.session import sessionmaker
45from sqlalchemy.sql.expression import func
46from sqlalchemy.sql.schema import Column
47from sqlalchemy.sql.sqltypes import Boolean, Date, Float, Integer, UnicodeText
49from camcops_server.cc_modules.cc_constants import DateFormat
50from camcops_server.cc_modules.cc_device import Device
51from camcops_server.cc_modules.cc_group import Group
52from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition
53from camcops_server.cc_modules.cc_patient import Patient
54from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
55from camcops_server.cc_modules.cc_task import Task
56from camcops_server.cc_modules.cc_user import User
57from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION
60if TYPE_CHECKING:
61 from sqlalchemy.orm import Session as SqlASession
62 from camcops_server.cc_modules.cc_config import CamcopsConfig
63 from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin
65log = BraceStyleAdapter(logging.getLogger(__name__))
68class DummyDataFactory(object):
69 FIRST_PATIENT_ID = 10001
70 NUM_PATIENTS = 5
72 DEFAULT_MIN_FLOAT = 0
73 DEFAULT_MAX_FLOAT = 1000
75 DEFAULT_MIN_INTEGER = 0
76 DEFAULT_MAX_INTEGER = 1000
78 def __init__(self, cfg: "CamcopsConfig") -> None:
79 engine = cfg.get_sqla_engine()
80 self.dbsession = sessionmaker()(bind=engine) # type: SqlASession
82 self.faker = Faker('en_GB')
84 self.era_time = pendulum.now()
85 self.era_time_utc = convert_datetime_to_utc(self.era_time)
86 self.era = format_datetime(self.era_time, DateFormat.ISO8601)
88 self.group = None # type: Optional[Group]
89 self.user = None # type: Optional[User]
90 self.device = None # type: Optional[Device]
91 self.nhs_iddef = None # type: Optional[IdNumDefinition]
93 def add_data(self) -> None:
94 # noinspection PyTypeChecker
95 next_id = self.next_id(Group.id)
97 self.group = Group()
98 self.group.name = f"dummygroup {next_id}"
99 self.group.description = "Dummy group"
100 self.group.upload_policy = "sex AND anyidnum"
101 self.group.finalize_policy = "sex AND idnum1001"
102 self.dbsession.add(self.group)
103 self.dbsession.commit() # sets PK fields
105 self.user = User.get_system_user(self.dbsession)
106 self.user.upload_group_id = self.group.id
108 self.device = self.get_device(self.dbsession)
109 self.dbsession.commit()
111 self.nhs_iddef = IdNumDefinition(which_idnum=1001,
112 description="NHS number (TEST)",
113 short_description="NHS#",
114 hl7_assigning_authority="NHS",
115 hl7_id_type="NHSN")
116 self.dbsession.add(self.nhs_iddef)
117 try:
118 self.dbsession.commit()
119 except IntegrityError:
120 self.dbsession.rollback()
122 for patient_id in range(self.FIRST_PATIENT_ID,
123 self.FIRST_PATIENT_ID + self.NUM_PATIENTS):
124 Faker.seed(patient_id)
125 self.add_patient(patient_id)
126 log.info(f"Adding tasks for patient {patient_id}")
128 Faker.seed()
129 self.add_tasks(patient_id)
131 # noinspection PyMethodMayBeStatic
132 def get_device(self, dbsession: "SqlASession") -> "Device":
133 dummy_device_name = "dummy_device"
135 device = Device.get_device_by_name(dbsession, dummy_device_name)
136 if device is None:
137 device = Device()
138 device.name = dummy_device_name
139 device.friendly_name = "Dummy tablet device"
140 device.registered_by_user = User.get_system_user(dbsession)
141 device.when_registered_utc = pendulum.DateTime.utcnow()
142 device.camcops_version = CAMCOPS_SERVER_VERSION
143 dbsession.add(device)
144 dbsession.flush() # So that we can use the PK elsewhere
145 return device
147 def add_patient(self, patient_id: int) -> Patient:
148 log.info(f"Adding patient {patient_id}")
150 patient = Patient()
152 patient.id = patient_id
153 self.apply_standard_db_fields(patient)
155 patient.sex = self.faker.random.choices(
156 ["M", "F", "X"],
157 weights=[49.8, 49.8, 0.4]
158 )[0]
160 if patient.sex == "M":
161 patient.forename = self.faker.first_name_male()
162 elif patient.sex == "F":
163 patient.forename = self.faker.first_name_female()
164 else:
165 patient.forename = self.faker.first_name()[:1]
167 patient.surname = self.faker.last_name()
169 # Faker date_of_birth calculates from the current time so gives
170 # different results on different days. By fixing the dates we get
171 # consistent results but our population ages over time.
172 patient.dob = self.faker.date_between_dates(
173 date_start=pendulum.date(1900, 1, 1),
174 date_end=pendulum.date(2020, 1, 1)
175 )
176 self.dbsession.add(patient)
178 self.add_patient_idnum(patient_id)
179 self.dbsession.commit()
181 return patient
183 # noinspection PyTypeChecker
184 def add_patient_idnum(self, patient_id: int) -> None:
185 next_id = self.next_id(PatientIdNum.id)
187 patient_idnum = PatientIdNum()
188 patient_idnum.id = next_id
189 self.apply_standard_db_fields(patient_idnum)
190 patient_idnum.patient_id = patient_id
191 patient_idnum.which_idnum = self.nhs_iddef.which_idnum
193 # Always create the same NHS number for each patient.
194 # Uses a different random object to faker.
195 # Restores the master RNG state afterwards.
196 old_random_state = random.getstate()
197 random.seed(patient_id)
198 patient_idnum.idnum_value = generate_random_nhs_number()
199 random.setstate(old_random_state)
201 self.dbsession.add(patient_idnum)
203 def add_tasks(self, patient_id: int):
204 for cls in Task.all_subclasses_by_tablename():
205 task = cls()
206 task.id = self.next_id(cls.id)
207 self.apply_standard_task_fields(task)
208 if task.has_patient:
209 task.patient_id = patient_id
211 self.fill_in_task_fields(task)
213 self.dbsession.add(task)
214 self.dbsession.commit()
216 def fill_in_task_fields(self, task: Task) -> None:
217 # noinspection PyUnresolvedReferences
218 for column in task.__table__.columns:
219 if not self.column_is_q_field(column):
220 continue
222 if isinstance(column.type, Integer):
223 self.set_integer_field(task, column)
224 continue
226 if isinstance(column.type, Float):
227 self.set_float_field(task, column)
228 continue
230 if isinstance(column.type, Boolean):
231 self.set_bool_field(task, column)
232 continue
234 if isinstance(column.type, Date):
235 self.set_date_field(task, column)
236 continue
238 if isinstance(column.type, UnicodeText):
239 self.set_unicode_text_field(task, column)
241 def set_integer_field(self, task: Task, column: Column) -> None:
242 setattr(task, column.name, self.get_valid_integer_for_field(column))
244 def set_float_field(self, task: Task, column: Column) -> None:
245 setattr(task, column.name, self.get_valid_float_for_field(column))
247 def set_bool_field(self, task: Task, column: Column) -> None:
248 setattr(task, column.name, self.faker.random.choice([False, True]))
250 def set_date_field(self, task: Task, column: Column) -> None:
251 setattr(task, column.name, self.faker.date_object())
253 def set_unicode_text_field(self, task: Task, column: Column) -> None:
254 setattr(task, column.name, self.faker.text())
256 def get_valid_integer_for_field(self, column: Column) -> int:
257 min_value = self.DEFAULT_MIN_INTEGER
258 max_value = self.DEFAULT_MAX_INTEGER
260 value_checker = getattr(column, "permitted_value_checker", None)
262 if value_checker is not None:
263 if value_checker.permitted_values is not None:
264 return self.faker.random.choice(value_checker.permitted_values)
266 if value_checker.minimum is not None:
267 min_value = value_checker.minimum
269 if value_checker.maximum is not None:
270 max_value = value_checker.maximum
272 return self.faker.random.randint(min_value, max_value)
274 def get_valid_float_for_field(self, column: Column) -> float:
275 min_value = self.DEFAULT_MIN_FLOAT
276 max_value = self.DEFAULT_MAX_FLOAT
278 value_checker = getattr(column, "permitted_value_checker", None)
280 if value_checker is not None:
281 if value_checker.permitted_values is not None:
282 return self.faker.random.choice(value_checker.permitted_values)
284 if value_checker.minimum is not None:
285 min_value = value_checker.minimum
287 if value_checker.maximum is not None:
288 max_value = value_checker.maximum
290 return self.faker.random.uniform(min_value, max_value)
292 @staticmethod
293 def column_is_q_field(column: Column) -> bool:
294 if column.name.startswith("_"):
295 return False
297 if column.name in [
298 'editing_time_s',
299 'firstexit_is_abort',
300 'firstexit_is_finish',
301 'id',
302 'patient_id',
303 'when_created',
304 'when_firstexit',
305 'when_last_modified',
306 ]:
307 return False
309 return True
311 def next_id(self, column: Column) -> int:
312 max_id = self.dbsession.query(func.max(column)).scalar()
313 if max_id is None:
314 return 1
316 return max_id + 1
318 def apply_standard_task_fields(self, task: Task) -> None:
319 """
320 Writes some default values to an SQLAlchemy ORM object representing
321 a task.
322 """
323 self.apply_standard_db_fields(task)
324 task.when_created = self.era_time
326 def apply_standard_db_fields(self,
327 obj: "GenericTabletRecordMixin") -> None:
328 """
329 Writes some default values to an SQLAlchemy ORM object representing a
330 record uploaded from a client (tablet) device.
331 """
332 obj._device_id = self.device.id
333 obj._era = self.era
334 obj._group_id = self.group.id
335 obj._current = True
336 obj._adding_user_id = self.user.id
337 obj._when_added_batch_utc = self.era_time_utc