Coverage for cc_modules/cc_dummy_database.py: 26%
222 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/cc_modules/cc_dummy_database.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Functions for dummy database creation for manual testing.**
28"""
30import logging
31import random
32from typing import Optional, TYPE_CHECKING
34from cardinal_pythonlib.datetimefunc import (
35 convert_datetime_to_utc,
36 format_datetime,
37)
38from cardinal_pythonlib.logs import BraceStyleAdapter
39from cardinal_pythonlib.nhs import generate_random_nhs_number
40from faker import Faker
41import pendulum
42from sqlalchemy.exc import IntegrityError
43from sqlalchemy.orm.session import sessionmaker
44from sqlalchemy.sql.expression import func
45from sqlalchemy.sql.schema import Column
46from sqlalchemy.sql.sqltypes import (
47 Boolean,
48 Date,
49 Float,
50 Integer,
51 String,
52 UnicodeText,
53)
55from camcops_server.cc_modules.cc_constants import DateFormat
56from camcops_server.cc_modules.cc_db import TASK_FREQUENT_AND_FK_FIELDS
57from camcops_server.cc_modules.cc_device import Device
58from camcops_server.cc_modules.cc_group import Group
59from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition
60from camcops_server.cc_modules.cc_patient import Patient
61from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
62from camcops_server.cc_modules.cc_sqla_coltypes import (
63 COLATTR_PERMITTED_VALUE_CHECKER,
64 PendulumDateTimeAsIsoTextColType,
65)
67from camcops_server.cc_modules.cc_task import Task
68from camcops_server.cc_modules.cc_user import User
69from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION
72if TYPE_CHECKING:
73 from sqlalchemy.orm import Session as SqlASession
74 from camcops_server.cc_modules.cc_config import CamcopsConfig
75 from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin
77log = BraceStyleAdapter(logging.getLogger(__name__))
80# =============================================================================
81# DummyDataInserter
82# =============================================================================
85class DummyDataInserter:
86 """
87 Class to insert random data (within constraints) to tasks and other
88 objects. It does not touch an actual database, so its methods can be used
89 for free-floating items.
90 """
92 DEFAULT_MIN_FLOAT = 0
93 DEFAULT_MAX_FLOAT = 1000
95 DEFAULT_MIN_INTEGER = 0
96 DEFAULT_MAX_INTEGER = 1000
98 def __init__(self) -> None:
99 self.faker = Faker("en_GB")
101 @staticmethod
102 def column_is_q_field(column: Column) -> bool:
103 if column.name.startswith("_"):
104 return False
106 if column.name in TASK_FREQUENT_AND_FK_FIELDS:
107 # It's that or TASK_FREQUENT_FIELDS.
108 return False
110 return True
112 def fill_in_task_fields(self, task: Task) -> None:
113 """
114 Inserts random data into a task (within any known constraints).
115 """
116 # noinspection PyUnresolvedReferences
117 for column in task.__table__.columns:
118 if not self.column_is_q_field(column): # type: ignore[arg-type]
119 continue
121 if isinstance(column.type, Integer):
122 self.set_integer_field(task, column) # type: ignore[arg-type]
123 continue
125 if isinstance(column.type, Float):
126 self.set_float_field(task, column) # type: ignore[arg-type]
127 continue
129 if isinstance(column.type, Boolean):
130 self.set_bool_field(task, column) # type: ignore[arg-type]
131 continue
133 if isinstance(column.type, Date):
134 self.set_date_field(task, column) # type: ignore[arg-type]
135 continue
137 if isinstance(column.type, PendulumDateTimeAsIsoTextColType):
138 self.set_datetime_field(task, column) # type: ignore[arg-type]
139 continue
141 if isinstance(column.type, UnicodeText):
142 self.set_unicode_text_field(task, column) # type: ignore[arg-type] # noqa: E501
143 continue
145 if isinstance(column.type, String):
146 # covers String, Text, UnicodeText (but handled above)
147 self.set_string_field(task, column) # type: ignore[arg-type]
149 def set_integer_field(self, task: Task, column: Column) -> None:
150 setattr(task, column.name, self.get_valid_integer_for_field(column))
152 def set_float_field(self, task: Task, column: Column) -> None:
153 setattr(task, column.name, self.get_valid_float_for_field(column))
155 def set_bool_field(self, task: Task, column: Column) -> None:
156 setattr(task, column.name, self.faker.random.choice([False, True]))
158 def set_date_field(self, task: Task, column: Column) -> None:
159 setattr(task, column.name, self.faker.date_object())
161 def set_datetime_field(self, task: Task, column: Column) -> None:
162 setattr(task, column.name, self.faker.date_time())
164 def set_unicode_text_field(self, task: Task, column: Column) -> None:
165 setattr(task, column.name, self.faker.text())
167 def set_string_field(self, task: Task, column: Column) -> None:
168 setattr(task, column.name, self.get_valid_string_for_field(column))
170 def get_valid_integer_for_field(self, column: Column) -> int:
171 min_value = self.DEFAULT_MIN_INTEGER
172 max_value = self.DEFAULT_MAX_INTEGER
174 value_checker = column.info.get(COLATTR_PERMITTED_VALUE_CHECKER)
176 if value_checker is not None:
177 if value_checker.permitted_values is not None:
178 return self.faker.random.choice(value_checker.permitted_values)
180 if value_checker.minimum is not None:
181 min_value = value_checker.minimum
183 if value_checker.maximum is not None:
184 max_value = value_checker.maximum
186 return self.faker.random.randint(min_value, max_value)
188 def get_valid_float_for_field(self, column: Column) -> float:
189 min_value = self.DEFAULT_MIN_FLOAT
190 max_value = self.DEFAULT_MAX_FLOAT
192 value_checker = column.info.get(COLATTR_PERMITTED_VALUE_CHECKER)
194 if value_checker is not None:
195 if value_checker.permitted_values is not None:
196 return self.faker.random.choice(value_checker.permitted_values)
198 if value_checker.minimum is not None:
199 min_value = value_checker.minimum
201 if value_checker.maximum is not None:
202 max_value = value_checker.maximum
204 return self.faker.random.uniform(min_value, max_value)
206 def get_valid_string_for_field(self, column: Column) -> str:
207 value_checker = column.info.get(COLATTR_PERMITTED_VALUE_CHECKER)
209 if value_checker is not None:
210 if value_checker.permitted_values is not None:
211 return self.faker.random.choice(value_checker.permitted_values)
212 text = self.faker.text()
214 column_type = column.type
216 assert isinstance(column_type, String)
218 if column_type.length is None:
219 return text
221 return text[: column_type.length]
224# =============================================================================
225# DummyDataFactory
226# =============================================================================
229class DummyDataFactory(DummyDataInserter):
230 """
231 Factory to insert random data (within constraints) to tasks and other
232 objects in a dummy database. Unlike its parent, this concerns itself with
233 an actual data.
234 """
236 FIRST_PATIENT_ID = 10001
237 NUM_PATIENTS = 5
239 def __init__(self, cfg: "CamcopsConfig") -> None:
240 super().__init__()
241 engine = cfg.get_sqla_engine()
242 self.dbsession = sessionmaker()(bind=engine) # type: SqlASession
244 self.era_time = pendulum.now()
245 self.era_time_utc = convert_datetime_to_utc(self.era_time)
246 self.era = format_datetime(self.era_time, DateFormat.ISO8601)
248 self.group = None # type: Optional[Group]
249 self.user = None # type: Optional[User]
250 self.device = None # type: Optional[Device]
251 self.nhs_iddef = None # type: Optional[IdNumDefinition]
253 def add_data(self) -> None:
254 # noinspection PyTypeChecker
255 next_id = self.next_id(Group.id) # type: ignore[arg-type]
257 self.group = Group()
258 self.group.name = f"dummygroup{next_id}"
259 self.group.description = "Dummy group"
260 self.group.upload_policy = "sex AND anyidnum"
261 self.group.finalize_policy = "sex AND idnum1001"
262 self.dbsession.add(self.group)
263 self.dbsession.commit() # sets PK fields
265 self.user = User.get_system_user(self.dbsession)
266 self.user.upload_group_id = self.group.id
268 self.device = self.get_device(self.dbsession)
269 self.dbsession.commit()
271 self.nhs_iddef = IdNumDefinition(
272 which_idnum=1001,
273 description="NHS number (TEST)",
274 short_description="NHS#",
275 hl7_assigning_authority="NHS",
276 hl7_id_type="NHSN",
277 )
278 self.dbsession.add(self.nhs_iddef)
279 try:
280 self.dbsession.commit()
281 except IntegrityError:
282 self.dbsession.rollback()
284 for patient_id in range(
285 self.FIRST_PATIENT_ID, self.FIRST_PATIENT_ID + self.NUM_PATIENTS
286 ):
287 Faker.seed(patient_id)
288 self.add_patient(patient_id)
289 log.info(f"Adding tasks for patient {patient_id}")
291 Faker.seed()
292 self.add_tasks(patient_id)
294 # noinspection PyMethodMayBeStatic
295 def get_device(self, dbsession: "SqlASession") -> "Device":
296 dummy_device_name = "dummy_device"
298 device = Device.get_device_by_name(dbsession, dummy_device_name)
299 if device is None:
300 device = Device()
301 device.name = dummy_device_name
302 device.friendly_name = "Dummy tablet device"
303 device.registered_by_user = User.get_system_user(dbsession)
304 device.when_registered_utc = pendulum.DateTime.utcnow()
305 device.camcops_version = CAMCOPS_SERVER_VERSION
306 dbsession.add(device)
307 dbsession.flush() # So that we can use the PK elsewhere
308 return device
310 def add_patient(self, patient_id: int) -> Patient:
311 log.info(f"Adding patient {patient_id}")
313 patient = Patient()
315 patient.id = patient_id
316 self.apply_standard_db_fields(patient)
318 patient.sex = self.faker.random.choices(
319 ["M", "F", "X"], weights=[49.8, 49.8, 0.4]
320 )[0]
322 if patient.sex == "M":
323 patient.forename = self.faker.first_name_male()
324 elif patient.sex == "F":
325 patient.forename = self.faker.first_name_female()
326 else:
327 patient.forename = self.faker.first_name()[:1]
329 patient.surname = self.faker.last_name()
331 # Faker date_of_birth calculates from the current time so gives
332 # different results on different days. By fixing the dates we get
333 # consistent results but our population ages over time.
334 patient.dob = self.faker.date_between_dates(
335 date_start=pendulum.date(1900, 1, 1),
336 date_end=pendulum.date(2020, 1, 1),
337 )
338 self.dbsession.add(patient)
340 self.add_patient_idnum(patient_id)
341 self.dbsession.commit()
343 return patient
345 # noinspection PyTypeChecker
346 def add_patient_idnum(self, patient_id: int) -> None:
347 next_id = self.next_id(PatientIdNum.id) # type: ignore[arg-type]
349 patient_idnum = PatientIdNum()
350 patient_idnum.id = next_id
351 self.apply_standard_db_fields(patient_idnum)
352 patient_idnum.patient_id = patient_id
353 patient_idnum.which_idnum = self.nhs_iddef.which_idnum
355 # Always create the same NHS number for each patient.
356 # Uses a different random object to faker.
357 # Restores the master RNG state afterwards.
358 old_random_state = random.getstate()
359 random.seed(patient_id)
360 patient_idnum.idnum_value = generate_random_nhs_number()
361 random.setstate(old_random_state)
363 self.dbsession.add(patient_idnum)
365 def add_tasks(self, patient_id: int) -> None:
366 for cls in Task.all_subclasses_by_tablename():
367 task = cls()
368 task.id = self.next_id(cls.id)
369 self.apply_standard_task_fields(task)
370 if task.has_patient:
371 task.patient_id = patient_id
373 self.fill_in_task_fields(task)
375 self.dbsession.add(task)
376 self.dbsession.commit()
378 def next_id(self, column: Column) -> int:
379 max_id = self.dbsession.query(func.max(column)).scalar()
380 if max_id is None:
381 return 1
383 return max_id + 1
385 def apply_standard_task_fields(self, task: Task) -> None:
386 """
387 Writes some default values to an SQLAlchemy ORM object representing
388 a task.
389 """
390 self.apply_standard_db_fields(task)
391 task.when_created = self.era_time
393 def apply_standard_db_fields(
394 self, obj: "GenericTabletRecordMixin"
395 ) -> None:
396 """
397 Writes some default values to an SQLAlchemy ORM object representing a
398 record uploaded from a client (tablet) device.
399 """
400 obj._device_id = self.device.id
401 obj._era = self.era
402 obj._group_id = self.group.id
403 obj._current = True
404 obj._adding_user_id = self.user.id
405 obj._when_added_batch_utc = self.era_time_utc