Coverage for cc_modules/cc_taskreports.py: 31%

154 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/cc_modules/cc_taskreports.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Server reports on CamCOPS tasks.** 

27 

28""" 

29 

30from collections import Counter, namedtuple 

31from operator import attrgetter 

32from typing import Any, Tuple, Type, TYPE_CHECKING 

33 

34from cardinal_pythonlib.classes import classproperty 

35from cardinal_pythonlib.sqlalchemy.core_query import ( 

36 get_rows_fieldnames_from_select, 

37) 

38from cardinal_pythonlib.sqlalchemy.sqlfunc import ( 

39 extract_month, 

40 extract_year, 

41 extract_day_of_month, 

42) 

43from sqlalchemy import cast, ColumnExpressionArgument, Integer 

44from sqlalchemy.engine import Row 

45from sqlalchemy.sql.elements import ColumnElement 

46from sqlalchemy.sql.expression import asc, desc, func, literal, select 

47 

48from camcops_server.cc_modules.cc_sqla_coltypes import ( 

49 isotzdatetime_to_utcdatetime, 

50) 

51from camcops_server.cc_modules.cc_forms import ( 

52 ReportParamSchema, 

53 ViaIndexSelector, 

54) 

55from camcops_server.cc_modules.cc_pyramid import ViewParam 

56from camcops_server.cc_modules.cc_report import Report, PlainReportType 

57from camcops_server.cc_modules.cc_reportschema import ( 

58 ByDayOfMonthSelector, 

59 ByMonthSelector, 

60 ByTaskSelector, 

61 ByUserSelector, 

62 ByYearSelector, 

63 DEFAULT_BY_DAY_OF_MONTH, 

64 DEFAULT_BY_MONTH, 

65 DEFAULT_BY_TASK, 

66 DEFAULT_BY_USER, 

67 DEFAULT_BY_YEAR, 

68) 

69 

70from camcops_server.cc_modules.cc_task import Task 

71from camcops_server.cc_modules.cc_taskindex import TaskIndexEntry 

72from camcops_server.cc_modules.cc_user import User 

73 

74if TYPE_CHECKING: 

75 from camcops_server.cc_modules.cc_request import CamcopsRequest 

76 

77 

78# ============================================================================= 

79# Parameter schema 

80# ============================================================================= 

81 

82 

83class TaskCountReportSchema(ReportParamSchema): 

84 by_year = ByYearSelector() # must match ViewParam.BY_YEAR 

85 by_month = ByMonthSelector() # must match ViewParam.BY_MONTH 

86 # must match ViewParam.BY_DAY_of_MONTH 

87 by_day_of_month = ByDayOfMonthSelector() 

88 by_task = ByTaskSelector() # must match ViewParam.BY_TASK 

89 by_user = ByUserSelector() # must match ViewParam.BY_USER 

90 via_index = ViaIndexSelector() # must match ViewParam.VIA_INDEX 

91 

92 

93# ============================================================================= 

94# Reports 

95# ============================================================================= 

96 

97 

98class TaskCountReport(Report): 

99 """ 

100 Report to count task instances. 

101 """ 

102 

103 label_year = "year" 

104 label_month = "month" 

105 label_day_of_month = "day_of_month" 

106 label_task = "task" 

107 label_user = "adding_user_name" 

108 label_n = "num_tasks_added" 

109 

110 # noinspection PyMethodParameters 

111 @classproperty 

112 def report_id(cls) -> str: 

113 return "taskcount" 

114 

115 @classmethod 

116 def title(cls, req: "CamcopsRequest") -> str: 

117 _ = req.gettext 

118 return _("(Server) Count current task instances") 

119 

120 # noinspection PyMethodParameters 

121 @classproperty 

122 def superuser_only(cls) -> bool: 

123 return False 

124 

125 @staticmethod 

126 def get_paramform_schema_class() -> Type["ReportParamSchema"]: 

127 return TaskCountReportSchema 

128 

129 @classmethod 

130 def get_specific_http_query_keys(cls) -> list[str]: 

131 return [ 

132 ViewParam.BY_YEAR, 

133 ViewParam.BY_MONTH, 

134 ViewParam.BY_DAY_OF_MONTH, 

135 ViewParam.BY_TASK, 

136 ViewParam.BY_USER, 

137 ViewParam.VIA_INDEX, 

138 ] 

139 

140 def get_rows_colnames(self, req: "CamcopsRequest") -> PlainReportType: 

141 self.dbsession = req.dbsession 

142 self.group_ids = req.user.ids_of_groups_user_may_report_on 

143 self.superuser = req.user.superuser 

144 

145 self.by_year = req.get_bool_param(ViewParam.BY_YEAR, DEFAULT_BY_YEAR) 

146 self.by_month = req.get_bool_param( 

147 ViewParam.BY_MONTH, DEFAULT_BY_MONTH 

148 ) 

149 self.by_day_of_month = req.get_bool_param( 

150 ViewParam.BY_DAY_OF_MONTH, DEFAULT_BY_DAY_OF_MONTH 

151 ) 

152 self.by_task = req.get_bool_param(ViewParam.BY_TASK, DEFAULT_BY_TASK) 

153 self.by_user = req.get_bool_param(ViewParam.BY_USER, DEFAULT_BY_USER) 

154 via_index = req.get_bool_param(ViewParam.VIA_INDEX, True) 

155 

156 if via_index: 

157 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

158 # Indexed method (preferable) 

159 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

160 rows, colnames = self._get_rows_colnames_via_index() 

161 else: 

162 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

163 # Without using the server method (worse) 

164 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

165 rows, colnames = self._get_rows_colnames_without_index() 

166 

167 return PlainReportType(rows=rows, column_names=colnames) 

168 

169 def _get_rows_colnames_via_index(self) -> Tuple[list[Row], list[str]]: 

170 selectors: list[ColumnElement[Any]] = [] 

171 groupers = [] 

172 sorters: list[ColumnExpressionArgument[Any]] = [] 

173 if self.by_year: 

174 selectors.append( 

175 cast( # Necessary for SQLite tests 

176 extract_year(TaskIndexEntry.when_created_utc), 

177 Integer(), 

178 ).label(self.label_year) 

179 ) 

180 groupers.append(self.label_year) 

181 sorters.append(desc(self.label_year)) 

182 if self.by_month: 

183 selectors.append( 

184 cast( # Necessary for SQLite tests 

185 extract_month(TaskIndexEntry.when_created_utc), 

186 Integer(), 

187 ).label(self.label_month) 

188 ) 

189 groupers.append(self.label_month) 

190 sorters.append(desc(self.label_month)) 

191 if self.by_day_of_month: 

192 selectors.append( 

193 cast( # Necessary for SQLite tests 

194 extract_day_of_month(TaskIndexEntry.when_created_utc), 

195 Integer(), 

196 ).label(self.label_day_of_month) 

197 ) 

198 groupers.append(self.label_day_of_month) 

199 sorters.append(desc(self.label_day_of_month)) 

200 if self.by_task: 

201 selectors.append( 

202 TaskIndexEntry.task_table_name.label(self.label_task) 

203 ) 

204 groupers.append(self.label_task) 

205 sorters.append(asc(self.label_task)) 

206 if self.by_user: 

207 selectors.append(User.username.label(self.label_user)) 

208 groupers.append(self.label_user) 

209 sorters.append(asc(self.label_user)) 

210 # Regardless: 

211 selectors.append(func.count().label(self.label_n)) 

212 

213 # noinspection PyUnresolvedReferences 

214 statement = ( 

215 select(*selectors) 

216 .select_from(TaskIndexEntry.__table__) 

217 .group_by(*groupers) 

218 .order_by(*sorters) 

219 # ... https://docs.sqlalchemy.org/en/latest/core/tutorial.html#ordering-or-grouping-by-a-label # noqa 

220 ) 

221 if self.by_user: 

222 # noinspection PyUnresolvedReferences 

223 statement = statement.select_from(User.__table__).where( 

224 TaskIndexEntry.adding_user_id == User.id 

225 ) 

226 if not self.superuser: 

227 # Restrict to accessible groups 

228 # noinspection PyProtectedMember 

229 statement = statement.where( 

230 TaskIndexEntry.group_id.in_(self.group_ids) 

231 ) 

232 rows, colnames = get_rows_fieldnames_from_select( 

233 self.dbsession, statement 

234 ) 

235 return rows, colnames 

236 

237 def _get_rows_colnames_without_index(self) -> Tuple[list[Row], list[str]]: 

238 final_rows = [] 

239 colnames = [] # type: ignore[var-annotated] 

240 

241 groupers: list[str] = [] 

242 sorters: list[Tuple[str, bool]] = [] 

243 # ... (key, reversed/descending) 

244 

245 if self.by_year: 

246 groupers.append(self.label_year) 

247 sorters.append((self.label_year, True)) 

248 if self.by_month: 

249 groupers.append(self.label_month) 

250 sorters.append((self.label_month, True)) 

251 if self.by_day_of_month: 

252 groupers.append(self.label_day_of_month) 

253 sorters.append((self.label_day_of_month, True)) 

254 if self.by_task: 

255 groupers.append(self.label_task) 

256 # ... redundant in the SQL, which involves multiple queries 

257 # (one per task type), but useful for the Python 

258 # aggregation. 

259 sorters.append((self.label_task, False)) 

260 if self.by_user: 

261 groupers.append(self.label_user) 

262 sorters.append((self.label_user, False)) 

263 

264 classes = Task.all_subclasses_by_tablename() 

265 counter: Counter = Counter() 

266 for cls in classes: 

267 selectors: list[ColumnElement[Any]] = [] 

268 

269 if self.by_year: 

270 selectors.append( 

271 # func.year() is specific to some DBs, e.g. MySQL 

272 # so is func.extract(); 

273 # http://modern-sql.com/feature/extract 

274 cast( # Necessary for SQLite tests 

275 extract_year( 

276 isotzdatetime_to_utcdatetime(cls.when_created) 

277 ), 

278 Integer(), 

279 ).label(self.label_year) 

280 ) 

281 if self.by_month: 

282 selectors.append( 

283 cast( # Necessary for SQLite tests 

284 extract_month( 

285 isotzdatetime_to_utcdatetime(cls.when_created) 

286 ), 

287 Integer(), 

288 ).label(self.label_month) 

289 ) 

290 if self.by_day_of_month: 

291 selectors.append( 

292 cast( # Necessary for SQLite tests 

293 extract_day_of_month( 

294 isotzdatetime_to_utcdatetime(cls.when_created) 

295 ), 

296 Integer(), 

297 ).label(self.label_day_of_month) 

298 ) 

299 if self.by_task: 

300 selectors.append( 

301 literal(cls.__tablename__).label(self.label_task) 

302 ) 

303 if self.by_user: 

304 selectors.append(User.username.label(self.label_user)) 

305 # Regardless: 

306 selectors.append(func.count().label(self.label_n)) 

307 

308 # noinspection PyUnresolvedReferences 

309 statement = ( 

310 select(*selectors) 

311 .select_from(cls.__table__) 

312 .where(cls._current == True) # noqa: E712 

313 .group_by(*groupers) 

314 ) 

315 if self.by_user: 

316 # noinspection PyUnresolvedReferences 

317 statement = statement.select_from(User.__table__).where( 

318 cls._adding_user_id == User.id 

319 ) 

320 if not self.superuser: 

321 # Restrict to accessible groups 

322 # noinspection PyProtectedMember 

323 statement = statement.where(cls._group_id.in_(self.group_ids)) 

324 rows, colnames = get_rows_fieldnames_from_select( 

325 self.dbsession, statement 

326 ) 

327 if self.by_task: 

328 final_rows.extend(rows) 

329 else: 

330 for row in rows: # type: Row 

331 key = tuple(getattr(row, keyname) for keyname in groupers) 

332 count = getattr(row, self.label_n) 

333 counter.update({key: count}) 

334 if not self.by_task: 

335 PseudoRow = namedtuple("PseudoRow", groupers + [self.label_n]) # type: ignore[misc] # noqa: E501 

336 for key, total in counter.items(): 

337 values = list(key) + [total] 

338 final_rows.append(PseudoRow(*values)) # type: ignore[arg-type] 

339 

340 self._sort_final_rows(final_rows, sorters) 

341 return final_rows, colnames 

342 

343 @staticmethod 

344 def _sort_final_rows( 

345 final_rows: list[Row], 

346 sorters: list[Tuple[str, bool]], 

347 ) -> None: 

348 # Complex sorting: 

349 # https://docs.python.org/3/howto/sorting.html#sort-stability-and-complex-sorts # noqa 

350 for key, descending in reversed(sorters): 

351 final_rows.sort(key=attrgetter(key), reverse=descending)