Coverage for cc_modules/cc_taskreports.py: 31%
154 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/cc_modules/cc_taskreports.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Server reports on CamCOPS tasks.**
28"""
30from collections import Counter, namedtuple
31from operator import attrgetter
32from typing import Any, Tuple, Type, TYPE_CHECKING
34from cardinal_pythonlib.classes import classproperty
35from cardinal_pythonlib.sqlalchemy.core_query import (
36 get_rows_fieldnames_from_select,
37)
38from cardinal_pythonlib.sqlalchemy.sqlfunc import (
39 extract_month,
40 extract_year,
41 extract_day_of_month,
42)
43from sqlalchemy import cast, ColumnExpressionArgument, Integer
44from sqlalchemy.engine import Row
45from sqlalchemy.sql.elements import ColumnElement
46from sqlalchemy.sql.expression import asc, desc, func, literal, select
48from camcops_server.cc_modules.cc_sqla_coltypes import (
49 isotzdatetime_to_utcdatetime,
50)
51from camcops_server.cc_modules.cc_forms import (
52 ReportParamSchema,
53 ViaIndexSelector,
54)
55from camcops_server.cc_modules.cc_pyramid import ViewParam
56from camcops_server.cc_modules.cc_report import Report, PlainReportType
57from camcops_server.cc_modules.cc_reportschema import (
58 ByDayOfMonthSelector,
59 ByMonthSelector,
60 ByTaskSelector,
61 ByUserSelector,
62 ByYearSelector,
63 DEFAULT_BY_DAY_OF_MONTH,
64 DEFAULT_BY_MONTH,
65 DEFAULT_BY_TASK,
66 DEFAULT_BY_USER,
67 DEFAULT_BY_YEAR,
68)
70from camcops_server.cc_modules.cc_task import Task
71from camcops_server.cc_modules.cc_taskindex import TaskIndexEntry
72from camcops_server.cc_modules.cc_user import User
74if TYPE_CHECKING:
75 from camcops_server.cc_modules.cc_request import CamcopsRequest
78# =============================================================================
79# Parameter schema
80# =============================================================================
83class TaskCountReportSchema(ReportParamSchema):
84 by_year = ByYearSelector() # must match ViewParam.BY_YEAR
85 by_month = ByMonthSelector() # must match ViewParam.BY_MONTH
86 # must match ViewParam.BY_DAY_of_MONTH
87 by_day_of_month = ByDayOfMonthSelector()
88 by_task = ByTaskSelector() # must match ViewParam.BY_TASK
89 by_user = ByUserSelector() # must match ViewParam.BY_USER
90 via_index = ViaIndexSelector() # must match ViewParam.VIA_INDEX
93# =============================================================================
94# Reports
95# =============================================================================
98class TaskCountReport(Report):
99 """
100 Report to count task instances.
101 """
103 label_year = "year"
104 label_month = "month"
105 label_day_of_month = "day_of_month"
106 label_task = "task"
107 label_user = "adding_user_name"
108 label_n = "num_tasks_added"
110 # noinspection PyMethodParameters
111 @classproperty
112 def report_id(cls) -> str:
113 return "taskcount"
115 @classmethod
116 def title(cls, req: "CamcopsRequest") -> str:
117 _ = req.gettext
118 return _("(Server) Count current task instances")
120 # noinspection PyMethodParameters
121 @classproperty
122 def superuser_only(cls) -> bool:
123 return False
125 @staticmethod
126 def get_paramform_schema_class() -> Type["ReportParamSchema"]:
127 return TaskCountReportSchema
129 @classmethod
130 def get_specific_http_query_keys(cls) -> list[str]:
131 return [
132 ViewParam.BY_YEAR,
133 ViewParam.BY_MONTH,
134 ViewParam.BY_DAY_OF_MONTH,
135 ViewParam.BY_TASK,
136 ViewParam.BY_USER,
137 ViewParam.VIA_INDEX,
138 ]
140 def get_rows_colnames(self, req: "CamcopsRequest") -> PlainReportType:
141 self.dbsession = req.dbsession
142 self.group_ids = req.user.ids_of_groups_user_may_report_on
143 self.superuser = req.user.superuser
145 self.by_year = req.get_bool_param(ViewParam.BY_YEAR, DEFAULT_BY_YEAR)
146 self.by_month = req.get_bool_param(
147 ViewParam.BY_MONTH, DEFAULT_BY_MONTH
148 )
149 self.by_day_of_month = req.get_bool_param(
150 ViewParam.BY_DAY_OF_MONTH, DEFAULT_BY_DAY_OF_MONTH
151 )
152 self.by_task = req.get_bool_param(ViewParam.BY_TASK, DEFAULT_BY_TASK)
153 self.by_user = req.get_bool_param(ViewParam.BY_USER, DEFAULT_BY_USER)
154 via_index = req.get_bool_param(ViewParam.VIA_INDEX, True)
156 if via_index:
157 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
158 # Indexed method (preferable)
159 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
160 rows, colnames = self._get_rows_colnames_via_index()
161 else:
162 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
163 # Without using the server method (worse)
164 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
165 rows, colnames = self._get_rows_colnames_without_index()
167 return PlainReportType(rows=rows, column_names=colnames)
169 def _get_rows_colnames_via_index(self) -> Tuple[list[Row], list[str]]:
170 selectors: list[ColumnElement[Any]] = []
171 groupers = []
172 sorters: list[ColumnExpressionArgument[Any]] = []
173 if self.by_year:
174 selectors.append(
175 cast( # Necessary for SQLite tests
176 extract_year(TaskIndexEntry.when_created_utc),
177 Integer(),
178 ).label(self.label_year)
179 )
180 groupers.append(self.label_year)
181 sorters.append(desc(self.label_year))
182 if self.by_month:
183 selectors.append(
184 cast( # Necessary for SQLite tests
185 extract_month(TaskIndexEntry.when_created_utc),
186 Integer(),
187 ).label(self.label_month)
188 )
189 groupers.append(self.label_month)
190 sorters.append(desc(self.label_month))
191 if self.by_day_of_month:
192 selectors.append(
193 cast( # Necessary for SQLite tests
194 extract_day_of_month(TaskIndexEntry.when_created_utc),
195 Integer(),
196 ).label(self.label_day_of_month)
197 )
198 groupers.append(self.label_day_of_month)
199 sorters.append(desc(self.label_day_of_month))
200 if self.by_task:
201 selectors.append(
202 TaskIndexEntry.task_table_name.label(self.label_task)
203 )
204 groupers.append(self.label_task)
205 sorters.append(asc(self.label_task))
206 if self.by_user:
207 selectors.append(User.username.label(self.label_user))
208 groupers.append(self.label_user)
209 sorters.append(asc(self.label_user))
210 # Regardless:
211 selectors.append(func.count().label(self.label_n))
213 # noinspection PyUnresolvedReferences
214 statement = (
215 select(*selectors)
216 .select_from(TaskIndexEntry.__table__)
217 .group_by(*groupers)
218 .order_by(*sorters)
219 # ... https://docs.sqlalchemy.org/en/latest/core/tutorial.html#ordering-or-grouping-by-a-label # noqa
220 )
221 if self.by_user:
222 # noinspection PyUnresolvedReferences
223 statement = statement.select_from(User.__table__).where(
224 TaskIndexEntry.adding_user_id == User.id
225 )
226 if not self.superuser:
227 # Restrict to accessible groups
228 # noinspection PyProtectedMember
229 statement = statement.where(
230 TaskIndexEntry.group_id.in_(self.group_ids)
231 )
232 rows, colnames = get_rows_fieldnames_from_select(
233 self.dbsession, statement
234 )
235 return rows, colnames
237 def _get_rows_colnames_without_index(self) -> Tuple[list[Row], list[str]]:
238 final_rows = []
239 colnames = [] # type: ignore[var-annotated]
241 groupers: list[str] = []
242 sorters: list[Tuple[str, bool]] = []
243 # ... (key, reversed/descending)
245 if self.by_year:
246 groupers.append(self.label_year)
247 sorters.append((self.label_year, True))
248 if self.by_month:
249 groupers.append(self.label_month)
250 sorters.append((self.label_month, True))
251 if self.by_day_of_month:
252 groupers.append(self.label_day_of_month)
253 sorters.append((self.label_day_of_month, True))
254 if self.by_task:
255 groupers.append(self.label_task)
256 # ... redundant in the SQL, which involves multiple queries
257 # (one per task type), but useful for the Python
258 # aggregation.
259 sorters.append((self.label_task, False))
260 if self.by_user:
261 groupers.append(self.label_user)
262 sorters.append((self.label_user, False))
264 classes = Task.all_subclasses_by_tablename()
265 counter: Counter = Counter()
266 for cls in classes:
267 selectors: list[ColumnElement[Any]] = []
269 if self.by_year:
270 selectors.append(
271 # func.year() is specific to some DBs, e.g. MySQL
272 # so is func.extract();
273 # http://modern-sql.com/feature/extract
274 cast( # Necessary for SQLite tests
275 extract_year(
276 isotzdatetime_to_utcdatetime(cls.when_created)
277 ),
278 Integer(),
279 ).label(self.label_year)
280 )
281 if self.by_month:
282 selectors.append(
283 cast( # Necessary for SQLite tests
284 extract_month(
285 isotzdatetime_to_utcdatetime(cls.when_created)
286 ),
287 Integer(),
288 ).label(self.label_month)
289 )
290 if self.by_day_of_month:
291 selectors.append(
292 cast( # Necessary for SQLite tests
293 extract_day_of_month(
294 isotzdatetime_to_utcdatetime(cls.when_created)
295 ),
296 Integer(),
297 ).label(self.label_day_of_month)
298 )
299 if self.by_task:
300 selectors.append(
301 literal(cls.__tablename__).label(self.label_task)
302 )
303 if self.by_user:
304 selectors.append(User.username.label(self.label_user))
305 # Regardless:
306 selectors.append(func.count().label(self.label_n))
308 # noinspection PyUnresolvedReferences
309 statement = (
310 select(*selectors)
311 .select_from(cls.__table__)
312 .where(cls._current == True) # noqa: E712
313 .group_by(*groupers)
314 )
315 if self.by_user:
316 # noinspection PyUnresolvedReferences
317 statement = statement.select_from(User.__table__).where(
318 cls._adding_user_id == User.id
319 )
320 if not self.superuser:
321 # Restrict to accessible groups
322 # noinspection PyProtectedMember
323 statement = statement.where(cls._group_id.in_(self.group_ids))
324 rows, colnames = get_rows_fieldnames_from_select(
325 self.dbsession, statement
326 )
327 if self.by_task:
328 final_rows.extend(rows)
329 else:
330 for row in rows: # type: Row
331 key = tuple(getattr(row, keyname) for keyname in groupers)
332 count = getattr(row, self.label_n)
333 counter.update({key: count})
334 if not self.by_task:
335 PseudoRow = namedtuple("PseudoRow", groupers + [self.label_n]) # type: ignore[misc] # noqa: E501
336 for key, total in counter.items():
337 values = list(key) + [total]
338 final_rows.append(PseudoRow(*values)) # type: ignore[arg-type]
340 self._sort_final_rows(final_rows, sorters)
341 return final_rows, colnames
343 @staticmethod
344 def _sort_final_rows(
345 final_rows: list[Row],
346 sorters: list[Tuple[str, bool]],
347 ) -> None:
348 # Complex sorting:
349 # https://docs.python.org/3/howto/sorting.html#sort-stability-and-complex-sorts # noqa
350 for key, descending in reversed(sorters):
351 final_rows.sort(key=attrgetter(key), reverse=descending)