Coverage for cc_modules/tests/cc_sqla_coltypes_tests.py: 35%

88 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-30 13:48 +0000

1""" 

2camcops_server/cc_modules/tests/cc_sqla_coltypes_tests.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25""" 

26 

27import datetime 

28from typing import Union 

29 

30from cardinal_pythonlib.datetimefunc import coerce_to_pendulum 

31import pendulum 

32from pendulum import DateTime as Pendulum, Duration 

33import phonenumbers 

34from semantic_version import Version 

35from sqlalchemy import insert 

36from sqlalchemy.sql.expression import select 

37from sqlalchemy.sql.functions import func 

38from sqlalchemy.sql.schema import Column 

39from sqlalchemy.sql.sqltypes import DateTime, Integer 

40 

41from camcops_server.cc_modules.cc_sqla_coltypes import ( 

42 isotzdatetime_to_utcdatetime, 

43 PendulumDateTimeAsIsoTextColType, 

44 PendulumDurationAsIsoTextColType, 

45 PhoneNumberColType, 

46 SemanticVersionColType, 

47 unknown_field_to_utcdatetime, 

48) 

49from camcops_server.cc_modules.cc_sqlalchemy import Base 

50from camcops_server.cc_modules.cc_unittest import DemoRequestTestCase 

51 

52 

53# ============================================================================= 

54# Unit testing 

55# ============================================================================= 

56class TestColType(Base): 

57 __tablename__ = "test_coltype" 

58 

59 id = Column("id", Integer, primary_key=True) 

60 dt_local = Column("dt_local", DateTime) 

61 dt_utc = Column("dt_utc", DateTime) 

62 dt_iso = Column("dt_iso", PendulumDateTimeAsIsoTextColType) 

63 duration_iso = Column("duration_iso", PendulumDurationAsIsoTextColType) 

64 version = Column("version", SemanticVersionColType) 

65 phone_number = Column("phone_number", PhoneNumberColType) 

66 

67 

68class SqlaColtypesTest(DemoRequestTestCase): 

69 def _assert_dt_equal( 

70 self, 

71 a: Union[datetime.datetime, Pendulum], 

72 b: Union[datetime.datetime, Pendulum], 

73 ) -> None: 

74 # Accept that one may have been truncated or rounded to milliseconds. 

75 a = coerce_to_pendulum(a) 

76 b = coerce_to_pendulum(b) 

77 diff = a - b 

78 

79 self.assertTrue(diff.microseconds < 1000, msg=f"{a!r} != {b!r}") 

80 

81 def test_iso_datetime_field(self) -> None: 

82 now = Pendulum.now() 

83 now_utc = now.in_tz(pendulum.UTC) 

84 yesterday = now.subtract(days=1) 

85 yesterday_utc = yesterday.in_tz(pendulum.UTC) 

86 

87 table = TestColType.__table__ 

88 

89 self.dbsession.execute( 

90 insert(table).values( 

91 [ 

92 { 

93 "id": 1, 

94 "dt_local": now, 

95 "dt_utc": now_utc, 

96 "dt_iso": now, 

97 }, 

98 { 

99 "id": 2, 

100 "dt_local": yesterday, 

101 "dt_utc": yesterday_utc, 

102 "dt_iso": yesterday, 

103 }, 

104 ] 

105 ) 

106 ) 

107 

108 statement = ( 

109 select( 

110 table.c.id, 

111 table.c.dt_local, 

112 table.c.dt_utc, 

113 table.c.dt_iso, 

114 func.length(table.c.dt_local).label("len_dt_local_col"), 

115 func.length(table.c.dt_utc).label("len_dt_utc_col"), 

116 func.length(table.c.dt_iso).label("len_iso_col"), 

117 isotzdatetime_to_utcdatetime(table.c.dt_iso).label( 

118 "iso_to_utcdt" 

119 ), 

120 unknown_field_to_utcdatetime(table.c.dt_utc).label( 

121 "uk_utcdt_to_utcdt" 

122 ), 

123 unknown_field_to_utcdatetime(table.c.dt_iso).label( 

124 "uk_iso_to_utc_dt" 

125 ), 

126 ) 

127 .select_from(table) 

128 .order_by(table.c.id) 

129 ) 

130 

131 rows = list(self.dbsession.execute(statement).mappings()) 

132 

133 self._assert_dt_equal(rows[0].dt_local, now) 

134 self._assert_dt_equal(rows[0].dt_utc, now_utc) 

135 self._assert_dt_equal(rows[0].dt_iso, now) 

136 self._assert_dt_equal(rows[0]["iso_to_utcdt"], now_utc) 

137 self._assert_dt_equal(rows[0]["uk_utcdt_to_utcdt"], now_utc) 

138 self._assert_dt_equal(rows[0]["uk_iso_to_utc_dt"], now_utc) 

139 

140 self._assert_dt_equal(rows[1].dt_local, yesterday) 

141 self._assert_dt_equal(rows[1].dt_utc, yesterday_utc) 

142 self._assert_dt_equal(rows[1].dt_iso, yesterday) 

143 self._assert_dt_equal(rows[1]["iso_to_utcdt"], yesterday_utc) 

144 self._assert_dt_equal(rows[1]["uk_utcdt_to_utcdt"], yesterday_utc) 

145 self._assert_dt_equal(rows[1]["uk_iso_to_utc_dt"], yesterday_utc) 

146 

147 def _assert_duration_equal(self, a: Duration, b: Duration) -> None: 

148 self.assertEqual(a.total_seconds(), b.total_seconds()) 

149 

150 def test_iso_duration_field(self) -> None: 

151 d1 = Duration(years=1, months=3, seconds=3, microseconds=4) 

152 d2 = Duration(seconds=987.654321) 

153 d3 = Duration(days=-5) 

154 

155 table = TestColType.__table__ 

156 

157 self.dbsession.execute( 

158 insert(table).values( 

159 [ 

160 {"id": 1, "duration_iso": d1}, 

161 {"id": 2, "duration_iso": d2}, 

162 {"id": 3, "duration_iso": d3}, 

163 ] 

164 ) 

165 ) 

166 

167 statement = ( 

168 select(table.c.id, table.c.duration_iso) 

169 .select_from(table) 

170 .order_by(table.c.id) 

171 ) 

172 

173 rows = list(self.dbsession.execute(statement).mappings()) 

174 

175 self._assert_duration_equal(rows[0].duration_iso, d1) 

176 self._assert_duration_equal(rows[1].duration_iso, d2) 

177 self._assert_duration_equal(rows[2].duration_iso, d3) 

178 

179 def test_semantic_version_field(self) -> None: 

180 v1 = Version("1.1.0") 

181 v2 = Version("2.0.1") 

182 v3 = Version("14.0.0") 

183 

184 table = TestColType.__table__ 

185 

186 self.dbsession.execute( 

187 insert(table).values( 

188 [ 

189 {"id": 1, "version": v1}, 

190 {"id": 2, "version": v2}, 

191 {"id": 3, "version": v3}, 

192 ] 

193 ) 

194 ) 

195 

196 statement = ( 

197 select(table.c.id, table.c.version) 

198 .select_from(table) 

199 .order_by(table.c.id) 

200 ) 

201 

202 rows = list(self.dbsession.execute(statement).mappings()) 

203 

204 self.assertEqual(rows[0]["version"], v1) 

205 self.assertEqual(rows[1]["version"], v2) 

206 self.assertEqual(rows[2]["version"], v3) 

207 

208 def test_phone_number_field(self) -> None: 

209 # https://en.wikipedia.org/wiki/Fictitious_telephone_number 

210 p1 = phonenumbers.parse("+44 (0)113 496 0123") 

211 p2 = phonenumbers.parse("+33 1 99 00 12 34 56") 

212 p3 = phonenumbers.parse("07700 900123", "GB") 

213 p4 = None 

214 

215 table = TestColType.__table__ 

216 

217 self.dbsession.execute( 

218 insert(table).values( 

219 [ 

220 {"id": 1, "phone_number": p1}, 

221 {"id": 2, "phone_number": p2}, 

222 {"id": 3, "phone_number": p3}, 

223 {"id": 4, "phone_number": p4}, 

224 ] 

225 ) 

226 ) 

227 

228 statement = ( 

229 select(table.c.id, table.c.phone_number) 

230 .select_from(table) 

231 .order_by(table.c.id) 

232 ) 

233 

234 rows = list(self.dbsession.execute(statement).mappings()) 

235 

236 self.assertEqual(rows[0]["phone_number"], p1) 

237 self.assertEqual(rows[1]["phone_number"], p2) 

238 self.assertEqual(rows[2]["phone_number"], p3) 

239 self.assertIsNone(rows[3]["phone_number"])