Coverage for cc_modules/cc_unittest.py: 52%

117 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:50 +0100

1""" 

2camcops_server/cc_modules/cc_unittest.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Framework and support functions for unit tests.** 

27 

28""" 

29 

30import base64 

31import copy 

32from faker import Faker 

33import logging 

34import os 

35import random 

36import sqlite3 

37from typing import Any, Dict, List, Type, TYPE_CHECKING 

38from unittest import mock, TestCase 

39 

40from cardinal_pythonlib.classes import all_subclasses 

41from cardinal_pythonlib.dbfunc import get_fieldnames_from_cursor 

42from cardinal_pythonlib.httpconst import MimeType 

43from cardinal_pythonlib.logs import BraceStyleAdapter 

44import pytest 

45from sqlalchemy.engine.base import Engine 

46 

47from camcops_server.cc_modules.cc_baseconstants import ENVVAR_CONFIG_FILE 

48from camcops_server.cc_modules.cc_device import Device 

49from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

50from camcops_server.cc_modules.cc_request import ( 

51 CamcopsRequest, 

52 get_unittest_request, 

53) 

54from camcops_server.cc_modules.cc_sqlalchemy import sql_from_sqlite_database 

55from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin 

56from camcops_server.cc_modules.cc_user import User 

57from camcops_server.cc_modules.cc_testfactories import ( 

58 BaseFactory, 

59 GroupFactory, 

60 NHSPatientIdNumFactory, 

61 PatientFactory, 

62 RioPatientIdNumFactory, 

63 UserFactory, 

64 UserGroupMembershipFactory, 

65) 

66from camcops_server.tasks.tests import factories as task_factories 

67 

68if TYPE_CHECKING: 

69 from sqlalchemy.orm import Session 

70 

71log = BraceStyleAdapter(logging.getLogger(__name__)) 

72 

73 

74# ============================================================================= 

75# Constants 

76# ============================================================================= 

77 

78DEMO_PNG_BYTES = base64.b64decode( 

79 "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVQYV2NgYAAAAAMAAWgmWQ0AAAAASUVORK5CYII=" # noqa 

80 # https://stackoverflow.com/questions/6018611 

81 # 1x1 pixel, black 

82) 

83 

84 

85# ============================================================================= 

86# Unit testing 

87# ============================================================================= 

88 

89 

90class ExtendedTestCase(TestCase): 

91 

92 def setUp(self) -> None: 

93 super().setUp() 

94 

95 # Arbitrary seed 

96 Faker.seed(1234) 

97 random.seed(1234) 

98 

99 """ 

100 A subclass of :class:`unittest.TestCase` that provides some additional 

101 functionality. 

102 """ 

103 

104 # Logging in unit tests: 

105 # https://stackoverflow.com/questions/7472863/pydev-unittesting-how-to-capture-text-logged-to-a-logging-logger-in-captured-o # noqa 

106 # https://stackoverflow.com/questions/7472863/pydev-unittesting-how-to-capture-text-logged-to-a-logging-logger-in-captured-o/15969985#15969985 

107 # ... but actually, my code below is simpler and works fine. 

108 

109 @classmethod 

110 def announce(cls, msg: str) -> None: 

111 """ 

112 Logs a message to the Python log. 

113 """ 

114 log.info("{}.{}:{}", cls.__module__, cls.__name__, msg) 

115 

116 def assertIsInstanceOrNone( 

117 self, obj: object, cls: Type, msg: str = None 

118 ) -> None: 

119 """ 

120 Asserts that ``obj`` is an instance of ``cls`` or is None. The 

121 parameter ``msg`` is used as part of the failure message if it isn't. 

122 """ 

123 if obj is None: 

124 return 

125 self.assertIsInstance(obj, cls, msg) 

126 

127 

128@pytest.mark.usefixtures("setup") 

129class DemoRequestTestCase(ExtendedTestCase): 

130 """ 

131 Test case that creates a demo Pyramid request that refers to a database. 

132 See server/camcops_server/conftest.py 

133 """ 

134 

135 dbsession: "Session" 

136 config_file: str 

137 engine: Engine 

138 database_on_disk: bool 

139 db_filename: str 

140 

141 def setUp(self) -> None: 

142 super().setUp() 

143 

144 for factory in all_subclasses(BaseFactory): 

145 factory._meta.sqlalchemy_session = self.dbsession 

146 

147 # config file has already been set up for the session in conftest.py 

148 os.environ[ENVVAR_CONFIG_FILE] = self.config_file 

149 self.req = get_unittest_request(self.dbsession) 

150 

151 # request.config is a class property. We want to be able to override 

152 # config settings in a test by setting them directly on the config 

153 # object (e.g. self.req.config.foo = "bar"), then restore the defaults 

154 # afterwards. 

155 self.old_config = copy.copy(self.req.config) 

156 

157 self.req.matched_route = mock.Mock() 

158 self.recipdef = ExportRecipient() 

159 

160 def tearDown(self) -> None: 

161 CamcopsRequest.config = self.old_config # type: ignore[method-assign] 

162 

163 def set_echo(self, echo: bool) -> None: 

164 """ 

165 Changes the database echo status. 

166 """ 

167 self.engine.echo = echo 

168 

169 def dump_database(self, loglevel: int = logging.INFO) -> None: 

170 """ 

171 Writes the test in-memory SQLite database to the logging stream. 

172 

173 Args: 

174 loglevel: log level to use 

175 """ 

176 if not self.database_on_disk: 

177 log.warning("Cannot dump database (use database_on_disk for that)") 

178 return 

179 log.info("Dumping database; please wait...") 

180 connection = sqlite3.connect(self.db_filename) 

181 sql_text = sql_from_sqlite_database(connection) 

182 connection.close() 

183 log.log(loglevel, "SQLite database:\n{}", sql_text) 

184 

185 def dump_table( 

186 self, 

187 tablename: str, 

188 column_names: List[str] = None, 

189 loglevel: int = logging.INFO, 

190 ) -> None: 

191 """ 

192 Writes one table of the in-memory SQLite database to the logging 

193 stream. 

194 

195 Args: 

196 tablename: table to dump 

197 column_names: column names to dump, or ``None`` for all 

198 loglevel: log level to use 

199 """ 

200 if not self.database_on_disk: 

201 log.warning("Cannot dump database (use database_on_disk for that)") 

202 return 

203 connection = sqlite3.connect(self.db_filename) 

204 cursor = connection.cursor() 

205 columns = ",".join(column_names) if column_names else "*" 

206 sql = f"SELECT {columns} FROM {tablename}" 

207 cursor.execute(sql) 

208 # noinspection PyTypeChecker 

209 fieldnames = get_fieldnames_from_cursor(cursor) # type: ignore[arg-type] # noqa: E501 

210 results = ( 

211 ",".join(fieldnames) 

212 + "\n" 

213 + "\n".join( 

214 ",".join(str(value) for value in row) 

215 for row in cursor.fetchall() 

216 ) 

217 ) 

218 connection.close() 

219 log.log(loglevel, "Contents of table {}:\n{}", tablename, results) 

220 

221 

222class BasicDatabaseTestCase(DemoRequestTestCase): 

223 """ 

224 Test case that sets up some minimal database records for testing. 

225 """ 

226 

227 def setUp(self) -> None: 

228 super().setUp() 

229 

230 self.group = GroupFactory() 

231 self.groupadmin = UserFactory() 

232 

233 self.superuser = UserFactory(superuser=True) 

234 

235 UserGroupMembershipFactory( 

236 group_id=self.group.id, user_id=self.groupadmin.id, groupadmin=True 

237 ) 

238 

239 self.system_user = User.get_system_user(self.dbsession) 

240 self.system_user.upload_group_id = self.group.id 

241 

242 self.req._debugging_user = self.superuser # improve our debugging user 

243 

244 self.server_device = Device.get_server_device(self.dbsession) 

245 self.dbsession.commit() 

246 

247 

248class DemoDatabaseTestCase(BasicDatabaseTestCase): 

249 """ 

250 Test case that sets up a demonstration CamCOPS database with two tasks of 

251 each type 

252 """ 

253 

254 def setUp(self) -> None: 

255 super().setUp() 

256 

257 self.demo_database_group = GroupFactory() 

258 

259 patient_with_two_idnums = PatientFactory( 

260 _group=self.demo_database_group 

261 ) 

262 NHSPatientIdNumFactory(patient=patient_with_two_idnums) 

263 RioPatientIdNumFactory(patient=patient_with_two_idnums) 

264 

265 patient_with_one_idnum = PatientFactory( 

266 _group=self.demo_database_group 

267 ) 

268 NHSPatientIdNumFactory(patient=patient_with_one_idnum) 

269 

270 for cls in Task.all_subclasses_by_tablename(): 

271 factory_class = getattr(task_factories, f"{cls.__name__}Factory") 

272 

273 t1_kwargs: Dict[str, Any] = dict(_group=self.demo_database_group) 

274 t2_kwargs = t1_kwargs.copy() 

275 if issubclass(cls, TaskHasPatientMixin): 

276 t1_kwargs.update(patient=patient_with_two_idnums) 

277 t2_kwargs.update(patient=patient_with_one_idnum) 

278 

279 if cls.__name__ == "Photo": 

280 blobargs = dict( 

281 create_blob__fieldname="photo_blobid", 

282 create_blob__filename="some_picture.png", 

283 create_blob__mimetype=MimeType.PNG, 

284 create_blob__image_rotation_deg_cw=0, 

285 create_blob__theblob=DEMO_PNG_BYTES, 

286 ) 

287 t1_kwargs.update(blobargs) 

288 t2_kwargs.update(blobargs) 

289 

290 factory_class(**t1_kwargs) 

291 factory_class(**t2_kwargs)