Coverage for cc_modules/cc_report.py: 31%

279 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/cc_modules/cc_report.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**CamCOPS reports.** 

27 

28""" 

29 

30import logging 

31from abc import ABC 

32from typing import ( 

33 Any, 

34 Callable, 

35 Dict, 

36 List, 

37 Optional, 

38 Sequence, 

39 Type, 

40 TYPE_CHECKING, 

41 Union, 

42) 

43 

44from cardinal_pythonlib.classes import all_subclasses, classproperty 

45from cardinal_pythonlib.datetimefunc import format_datetime 

46from cardinal_pythonlib.logs import BraceStyleAdapter 

47from cardinal_pythonlib.pyramid.responses import ( 

48 OdsResponse, 

49 TsvResponse, 

50 XlsxResponse, 

51) 

52from deform.form import Form 

53from pyramid.httpexceptions import HTTPBadRequest 

54from pyramid.renderers import render_to_response 

55from pyramid.response import Response 

56from sqlalchemy.sql.elements import ColumnElement 

57from sqlalchemy.sql.expression import and_, column, func, select 

58from sqlalchemy.sql.selectable import SelectBase 

59 

60# import as LITTLE AS POSSIBLE; this is used by lots of modules 

61from camcops_server.cc_modules.cc_constants import ( 

62 DateFormat, 

63 DEFAULT_ROWS_PER_PAGE, 

64) 

65from camcops_server.cc_modules.cc_db import FN_CURRENT, TFN_WHEN_CREATED 

66from camcops_server.cc_modules.cc_pyramid import ( 

67 CamcopsPage, 

68 PageUrl, 

69 ViewArg, 

70 ViewParam, 

71) 

72from camcops_server.cc_modules.cc_spreadsheet import ( 

73 SpreadsheetCollection, 

74 SpreadsheetPage, 

75) 

76 

77if TYPE_CHECKING: 

78 from camcops_server.cc_modules.cc_forms import ( # noqa: F401 

79 ReportParamForm, 

80 ReportParamSchema, 

81 ) 

82 from camcops_server.cc_modules.cc_request import ( 

83 CamcopsRequest, 

84 ) 

85 from camcops_server.cc_modules.cc_task import Task 

86 

87log = BraceStyleAdapter(logging.getLogger(__name__)) 

88 

89 

90# ============================================================================= 

91# Other constants 

92# ============================================================================= 

93 

94 

95class PlainReportType(object): 

96 """ 

97 Simple class to hold the results of a plain report. 

98 """ 

99 

100 def __init__( 

101 self, rows: Sequence[Sequence[Any]], column_names: Sequence[str] 

102 ) -> None: 

103 self.rows = rows 

104 self.column_names = column_names 

105 

106 

107# ============================================================================= 

108# Report class 

109# ============================================================================= 

110 

111 

112class Report(object): 

113 """ 

114 Abstract base class representing a report. 

115 

116 If you are writing a report, you must override these attributes: 

117 

118 - :meth:`report_id` 

119 - :meth:`report_title` 

120 - One combination of: 

121 

122 - (simplest) :meth:`get_query` OR :meth:`get_rows_colnames` 

123 - (for multi-page results) :meth:`render_html` and 

124 :meth:`get_spreadsheet_pages` 

125 - (manual control) all ``render_*`` functions 

126 

127 See the explanations of each. 

128 """ 

129 

130 template_name = "report.mako" 

131 

132 # ------------------------------------------------------------------------- 

133 # Attributes that must be provided 

134 # ------------------------------------------------------------------------- 

135 # noinspection PyMethodParameters 

136 @classproperty 

137 def report_id(cls) -> str: 

138 """ 

139 Returns a identifying string, unique to this report, used in the HTML 

140 report selector. 

141 """ 

142 raise NotImplementedError("implement in subclass") 

143 

144 @classmethod 

145 def title(cls, req: "CamcopsRequest") -> str: 

146 """ 

147 Descriptive title for display purposes. 

148 """ 

149 raise NotImplementedError("implement in subclass") 

150 

151 # noinspection PyMethodParameters 

152 @classproperty 

153 def superuser_only(cls) -> bool: 

154 """ 

155 If ``True`` (the default), only superusers may run the report. 

156 You must explicitly override this property to permit others. 

157 """ 

158 return True 

159 

160 @classmethod 

161 def get_http_query_keys(cls) -> List[str]: 

162 """ 

163 Returns the keys used for the HTTP GET query. They include details of: 

164 

165 - which report? 

166 - how to view it? 

167 - pagination options 

168 - report-specific configuration details from 

169 :func:`get_specific_http_query_keys`. 

170 """ 

171 return [ 

172 ViewParam.REPORT_ID, 

173 ViewParam.VIEWTYPE, 

174 ViewParam.ROWS_PER_PAGE, 

175 ViewParam.PAGE, 

176 ] + cls.get_specific_http_query_keys() 

177 

178 @classmethod 

179 def get_specific_http_query_keys(cls) -> List[str]: 

180 """ 

181 Additional HTTP GET query keys used by this report. Override to add 

182 custom ones. 

183 """ 

184 return [] 

185 

186 def get_query(self, req: "CamcopsRequest") -> Optional[SelectBase]: 

187 """ 

188 Overriding this function is one way of providing a report. (The other 

189 is :func:`get_rows_colnames`.) 

190 

191 To override this function, return the SQLAlchemy Base :class:`Select` 

192 statement to execute the report. 

193 

194 (Do not return a :class:`Query`; that can no longer be executed via 

195 ``session.execute()`` in SQLAlchemy 2.) 

196 

197 Parameters are passed in via the request. 

198 """ 

199 return None 

200 

201 def get_rows_colnames( 

202 self, req: "CamcopsRequest" 

203 ) -> Optional[PlainReportType]: 

204 """ 

205 Overriding this function is one way of providing a report. (The other 

206 is :func:`get_query`.) 

207 

208 To override this function, return a :class:`PlainReportType` with 

209 column names and row content. 

210 """ 

211 return None 

212 

213 @staticmethod 

214 def get_paramform_schema_class() -> Type["ReportParamSchema"]: 

215 """ 

216 Returns the class used as the Colander schema for the form that 

217 configures the report. By default, this is a simple form that just 

218 offers a choice of output format, but you can provide a more 

219 extensive one (an example being in 

220 :class:`camcops_server.tasks.diagnosis.DiagnosisFinderReportBase`. 

221 """ 

222 from camcops_server.cc_modules.cc_forms import ( 

223 ReportParamSchema, 

224 ) # delayed import 

225 

226 return ReportParamSchema 

227 

228 def get_form(self, req: "CamcopsRequest") -> Form: 

229 """ 

230 Returns a Colander form to configure the report. The default usually 

231 suffices, and it will use the schema specified in 

232 :func:`get_paramform_schema_class`. 

233 """ 

234 from camcops_server.cc_modules.cc_forms import ( # noqa: F811 

235 ReportParamForm, 

236 ) # delayed import 

237 

238 schema_class = self.get_paramform_schema_class() 

239 return ReportParamForm(request=req, schema_class=schema_class) 

240 

241 @staticmethod 

242 def get_test_querydict() -> Dict[str, Any]: 

243 """ 

244 What this function returns is used as the specimen Colander 

245 ``appstruct`` for unit tests. The default is an empty dictionary. 

246 """ 

247 return {} 

248 

249 @staticmethod 

250 def add_task_report_filters(wheres: List[ColumnElement]) -> None: 

251 """ 

252 Adds any restrictions required to a list of SQLAlchemy Core ``WHERE`` 

253 clauses. 

254 

255 Override this (or provide additional filters and call this) to provide 

256 global filters to queries used to create reports. 

257 

258 Used by :class:`DateTimeFilteredReportMixin`, etc. 

259 

260 The presumption is that the thing being filtered is an instance of 

261 :class:`camcops_server.cc_modules.cc_task.Task`. 

262 

263 Args: 

264 wheres: 

265 list of SQL ``WHERE`` conditions, each represented as an 

266 SQLAlchemy :class:`ColumnElement`. This list is modifed in 

267 place. The caller will need to apply the final list to the 

268 query. 

269 """ 

270 # noinspection PyPep8 

271 wheres.append(column(FN_CURRENT) == True) # noqa: E712 

272 

273 # ------------------------------------------------------------------------- 

274 # Common functionality: classmethods 

275 # ------------------------------------------------------------------------- 

276 

277 @classmethod 

278 def all_subclasses(cls) -> List[Type["Report"]]: 

279 """ 

280 Get all report subclasses, except those not implementing their 

281 ``report_id`` property. Optionally, sort by their title. 

282 """ 

283 # noinspection PyTypeChecker 

284 classes = all_subclasses(cls) # type: List[Type["Report"]] 

285 instantiated_report_classes = [] # type: List[Type["Report"]] 

286 for reportcls in classes: 

287 if reportcls.__name__ == "TestReport": 

288 continue 

289 

290 try: 

291 _ = reportcls.report_id 

292 instantiated_report_classes.append(reportcls) 

293 except NotImplementedError: 

294 # This is a subclass of Report, but it's still an abstract 

295 # class; skip it. 

296 pass 

297 return instantiated_report_classes 

298 

299 # ------------------------------------------------------------------------- 

300 # Common functionality: default Response 

301 # ------------------------------------------------------------------------- 

302 

303 def get_response(self, req: "CamcopsRequest") -> Response: 

304 """ 

305 Return the report content itself, as an HTTP :class:`Response`. 

306 """ 

307 # Check the basic parameters 

308 report_id = req.get_str_param(ViewParam.REPORT_ID) 

309 

310 if report_id != self.report_id: 

311 raise HTTPBadRequest("Error - request directed to wrong report!") 

312 

313 # viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.HTML, 

314 # lower=True) 

315 # ... NO; for a Deform radio button, the request contains parameters 

316 # like 

317 # ('__start__', 'viewtype:rename'), 

318 # ('deformField2', 'tsv'), 

319 # ('__end__', 'viewtype:rename') 

320 # ... so we need to ask the appstruct instead. 

321 # This is a bit different from how we manage trackers/CTVs, where we 

322 # recode the appstruct to a URL. 

323 # 

324 # viewtype = appstruct.get(ViewParam.VIEWTYPE) # type: str 

325 # 

326 # Ah, no... that fails with pagination of reports. Let's redirect 

327 # things to the HTTP query, as for trackers/audit! 

328 

329 viewtype = req.get_str_param( 

330 ViewParam.VIEWTYPE, ViewArg.HTML, lower=True 

331 ) 

332 # Run the report (which may take additional parameters from the 

333 # request) 

334 # Serve the result 

335 if viewtype == ViewArg.HTML: 

336 return self.render_html(req=req) 

337 

338 if viewtype == ViewArg.ODS: 

339 return self.render_ods(req=req) 

340 

341 if viewtype == ViewArg.TSV: 

342 return self.render_tsv(req=req) 

343 

344 if viewtype == ViewArg.XLSX: 

345 return self.render_xlsx(req=req) 

346 

347 raise HTTPBadRequest("Bad viewtype") 

348 

349 def render_html(self, req: "CamcopsRequest") -> Response: 

350 rows_per_page = req.get_int_param( 

351 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE 

352 ) 

353 page_num = req.get_int_param(ViewParam.PAGE, 1) 

354 

355 plain_report = self._get_plain_report(req) 

356 

357 page = CamcopsPage( 

358 collection=plain_report.rows, 

359 page=page_num, 

360 items_per_page=rows_per_page, 

361 url_maker=PageUrl(req), 

362 request=req, 

363 ) 

364 

365 return self.render_single_page_html( 

366 req=req, column_names=plain_report.column_names, page=page 

367 ) 

368 

369 def render_tsv(self, req: "CamcopsRequest") -> TsvResponse: 

370 filename = self.get_filename(req, ViewArg.TSV) 

371 

372 # By default there is only one page. If there are more, 

373 # we only output the first 

374 page = self.get_spreadsheet_pages(req)[0] 

375 

376 return TsvResponse(body=page.get_tsv(), filename=filename) 

377 

378 def render_xlsx(self, req: "CamcopsRequest") -> XlsxResponse: 

379 filename = self.get_filename(req, ViewArg.XLSX) 

380 tsvcoll = self.get_spreadsheet_collection(req) 

381 content = tsvcoll.as_xlsx() 

382 

383 return XlsxResponse(body=content, filename=filename) 

384 

385 def render_ods(self, req: "CamcopsRequest") -> OdsResponse: 

386 filename = self.get_filename(req, ViewArg.ODS) 

387 tsvcoll = self.get_spreadsheet_collection(req) 

388 content = tsvcoll.as_ods() 

389 

390 return OdsResponse(body=content, filename=filename) 

391 

392 def get_spreadsheet_collection( 

393 self, req: "CamcopsRequest" 

394 ) -> SpreadsheetCollection: 

395 coll = SpreadsheetCollection() 

396 coll.add_pages(self.get_spreadsheet_pages(req)) 

397 

398 return coll 

399 

400 def get_spreadsheet_pages( 

401 self, req: "CamcopsRequest" 

402 ) -> List[SpreadsheetPage]: 

403 plain_report = self._get_plain_report(req) 

404 

405 page = self.get_spreadsheet_page( 

406 name=self.title(req), 

407 column_names=plain_report.column_names, 

408 rows=plain_report.rows, 

409 ) 

410 return [page] 

411 

412 @staticmethod 

413 def get_spreadsheet_page( 

414 name: str, column_names: Sequence[str], rows: Sequence[Sequence[Any]] 

415 ) -> SpreadsheetPage: 

416 keyed_rows = [dict(zip(column_names, r)) for r in rows] 

417 page = SpreadsheetPage(name=name, rows=keyed_rows) 

418 

419 return page 

420 

421 def get_filename(self, req: "CamcopsRequest", viewtype: str) -> str: 

422 extension_dict = { 

423 ViewArg.ODS: "ods", 

424 ViewArg.TSV: "tsv", 

425 ViewArg.XLSX: "xlsx", 

426 } 

427 

428 if viewtype not in extension_dict: 

429 raise HTTPBadRequest("Unsupported viewtype") 

430 

431 extension = extension_dict.get(viewtype) 

432 

433 return ( 

434 "CamCOPS_" 

435 + self.report_id 

436 + "_" 

437 + format_datetime(req.now, DateFormat.FILENAME) 

438 + "." 

439 + extension 

440 ) 

441 

442 def render_single_page_html( 

443 self, 

444 req: "CamcopsRequest", 

445 column_names: Sequence[str], 

446 page: CamcopsPage, 

447 ) -> Response: 

448 """ 

449 Converts a paginated report into an HTML response. 

450 

451 If you wish, you can override this for more report customization. 

452 """ 

453 return render_to_response( 

454 self.template_name, 

455 dict( 

456 title=self.title(req), 

457 page=page, 

458 column_names=column_names, 

459 report_id=self.report_id, 

460 ), 

461 request=req, 

462 ) 

463 

464 def _get_plain_report(self, req: "CamcopsRequest") -> PlainReportType: 

465 """ 

466 Uses :meth:`get_query`, or if absent, :meth:`get_rows_colnames`, to 

467 fetch data. Returns a "single-page" type report, in the form of a 

468 :class:`PlainReportType`. 

469 """ 

470 statement = self.get_query(req) 

471 if statement is not None: 

472 rp = req.dbsession.execute(statement) 

473 column_names = rp.keys() 

474 rows = rp.fetchall() 

475 

476 plain_report = PlainReportType( 

477 rows=rows, column_names=column_names # type: ignore[arg-type] 

478 ) 

479 else: 

480 plain_report = self.get_rows_colnames(req) 

481 if plain_report is None: 

482 raise NotImplementedError( 

483 "Report did not implement either of get_query()" 

484 " or get_rows_colnames()" 

485 ) 

486 

487 return plain_report 

488 

489 

490class PercentageSummaryReportMixin(object): 

491 """ 

492 Mixin to be used with :class:`Report`. 

493 """ 

494 

495 @classproperty 

496 def task_class(self) -> Type["Task"]: 

497 raise NotImplementedError("implement in subclass") 

498 

499 def get_percentage_summaries( 

500 self, 

501 req: "CamcopsRequest", 

502 column_dict: Dict[str, str], 

503 num_answers: int, 

504 cell_format: str = "{}", 

505 min_answer: int = 0, 

506 ) -> List[List[str]]: 

507 """ 

508 Provides a summary of each question, x% of people said each response. 

509 """ 

510 rows = [] 

511 

512 for column_name, question in column_dict.items(): 

513 """ 

514 e.g. SELECT COUNT(col) FROM perinatal_poem WHERE col IS NOT NULL 

515 """ 

516 wheres = [column(column_name).isnot(None)] 

517 

518 # noinspection PyUnresolvedReferences 

519 self.add_task_report_filters(wheres) # type: ignore[attr-defined] 

520 

521 # noinspection PyUnresolvedReferences 

522 total_query = ( 

523 select(func.count(column_name)) # type: ignore[arg-type] 

524 .select_from(self.task_class.__table__) 

525 .where(and_(*wheres)) 

526 ) 

527 

528 total_responses = req.dbsession.execute(total_query).fetchone()[0] 

529 

530 row = [question] + [total_responses] + [""] * num_answers 

531 

532 """ 

533 e.g. 

534 SELECT total_responses,col, ((100 * COUNT(col)) / total_responses) 

535 FROM perinatal_poem WHERE col is not NULL 

536 GROUP BY col 

537 """ 

538 # noinspection PyUnresolvedReferences 

539 query = ( 

540 select( # type: ignore[var-annotated] 

541 column(column_name), 

542 ((100 * func.count(column_name)) / total_responses), # type: ignore[arg-type] # noqa: E501 

543 ) 

544 .select_from(self.task_class.__table__) 

545 .where(and_(*wheres)) 

546 .group_by(column_name) 

547 ) 

548 

549 # row output is: 

550 # 0 1 2 3 

551 # +----------+-----------------+--------------+--------------+---- 

552 # | question | total responses | % 1st answer | % 2nd answer | ... 

553 # +----------+-----------------+--------------+--------------+---- 

554 for result in req.dbsession.execute(query): 

555 col = 2 + (result[0] - min_answer) 

556 row[col] = cell_format.format(result[1]) 

557 

558 rows.append(row) 

559 

560 return rows 

561 

562 

563class DateTimeFilteredReportMixin(object): 

564 def __init__(self, *args: Any, **kwargs: Any): 

565 super().__init__(*args, **kwargs) 

566 self.start_datetime = None # type: Optional[str] 

567 self.end_datetime = None # type: Optional[str] 

568 

569 @staticmethod 

570 def get_paramform_schema_class() -> Type["ReportParamSchema"]: 

571 from camcops_server.cc_modules.cc_forms import ( 

572 DateTimeFilteredReportParamSchema, 

573 ) # delayed import 

574 

575 return DateTimeFilteredReportParamSchema 

576 

577 @classmethod 

578 def get_specific_http_query_keys(cls) -> List[str]: 

579 # noinspection PyUnresolvedReferences 

580 return super().get_specific_http_query_keys() + [ # type: ignore[misc] 

581 ViewParam.START_DATETIME, 

582 ViewParam.END_DATETIME, 

583 ] 

584 

585 def get_response(self, req: "CamcopsRequest") -> Response: 

586 self.start_datetime = format_datetime( 

587 req.get_datetime_param(ViewParam.START_DATETIME), DateFormat.ERA 

588 ) 

589 self.end_datetime = format_datetime( 

590 req.get_datetime_param(ViewParam.END_DATETIME), DateFormat.ERA 

591 ) 

592 

593 # noinspection PyUnresolvedReferences 

594 return super().get_response(req) # type: ignore[misc] 

595 

596 def add_task_report_filters(self, wheres: List[ColumnElement]) -> None: 

597 """ 

598 Adds any restrictions required to a list of SQLAlchemy Core ``WHERE`` 

599 clauses. 

600 

601 See :meth:`Report.add_task_report_filters`. 

602 

603 Args: 

604 wheres: 

605 list of SQL ``WHERE`` conditions, each represented as an 

606 SQLAlchemy :class:`ColumnElement`. This list is modifed in 

607 place. The caller will need to apply the final list to the 

608 query. 

609 """ 

610 # noinspection PyUnresolvedReferences 

611 super().add_task_report_filters(wheres) # type: ignore[misc] 

612 

613 if self.start_datetime is not None: 

614 wheres.append(column(TFN_WHEN_CREATED) >= self.start_datetime) 

615 

616 if self.end_datetime is not None: 

617 wheres.append(column(TFN_WHEN_CREATED) < self.end_datetime) 

618 

619 

620class ScoreDetails(object): 

621 """ 

622 Represents a type of score whose progress we want to track over time. 

623 """ 

624 

625 def __init__( 

626 self, 

627 name: str, 

628 scorefunc: Callable[["Task"], Union[None, int, float]], 

629 minimum: int, 

630 maximum: int, 

631 higher_score_is_better: bool = False, 

632 ) -> None: 

633 """ 

634 Args: 

635 name: 

636 human-friendly name of this score 

637 scorefunc: 

638 function that can be called with a task instance as its 

639 sole parameter and which will return a numerical score (or 

640 ``None``) 

641 minimum: 

642 minimum possible value of this score (for display purposes) 

643 maximum: 

644 maximum possible value of this score (for display purposes) 

645 higher_score_is_better: 

646 is a higher score a better thing? 

647 """ 

648 self.name = name 

649 self.scorefunc = scorefunc 

650 self.minimum = minimum 

651 self.maximum = maximum 

652 self.higher_score_is_better = higher_score_is_better 

653 

654 def calculate_improvement( 

655 self, first_score: float, latest_score: float 

656 ) -> float: 

657 """ 

658 Improvement is positive. 

659 

660 So if higher scores are better, returns ``latest - first``; otherwise 

661 returns ``first - latest``. 

662 """ 

663 if self.higher_score_is_better: 

664 return latest_score - first_score 

665 else: 

666 return first_score - latest_score 

667 

668 

669class AverageScoreReport(DateTimeFilteredReportMixin, Report, ABC): 

670 """ 

671 Used by MAAS, CORE-10 and PBQ to report average scores and progress 

672 """ 

673 

674 template_name = "average_score_report.mako" 

675 

676 def __init__( 

677 self, *args: Any, via_index: bool = True, **kwargs: Any 

678 ) -> None: 

679 """ 

680 Args: 

681 via_index: 

682 set this to ``False`` for unit test when you don't want to 

683 have to build a dummy task index. 

684 """ 

685 super().__init__(*args, **kwargs) 

686 self.via_index = via_index 

687 

688 # noinspection PyMethodParameters 

689 @classproperty 

690 def superuser_only(cls) -> bool: 

691 return False 

692 

693 # noinspection PyMethodParameters 

694 @classproperty 

695 def task_class(cls) -> Type["Task"]: 

696 raise NotImplementedError("Report did not implement task_class") 

697 

698 # noinspection PyMethodParameters 

699 @classmethod 

700 def scoretypes(cls, req: "CamcopsRequest") -> List[ScoreDetails]: 

701 raise NotImplementedError("Report did not implement 'scoretypes'") 

702 

703 @staticmethod 

704 def no_data_value() -> Any: 

705 """ 

706 The value used for a "no data" cell. 

707 

708 The only reason this is accessible outside this class is for unit 

709 testing. 

710 """ 

711 return "" 

712 

713 def render_html(self, req: "CamcopsRequest") -> Response: 

714 pages = self.get_spreadsheet_pages(req) 

715 return render_to_response( 

716 self.template_name, 

717 dict( 

718 title=self.title(req), 

719 mainpage=pages[0], 

720 datepage=pages[1], 

721 report_id=self.report_id, 

722 ), 

723 request=req, 

724 ) 

725 

726 def get_spreadsheet_pages( 

727 self, req: "CamcopsRequest" 

728 ) -> List[SpreadsheetPage]: 

729 """ 

730 We use an SQLAlchemy ORM, rather than Core, method. Why? 

731 

732 - "Patient equality" is complex (e.g. same patient_id on same device, 

733 or a shared ID number, etc.) -- simplicity via Patient.__eq__. 

734 - Facilities "is task complete?" checks, and use of Python 

735 calculations. 

736 """ 

737 _ = req.gettext 

738 from camcops_server.cc_modules.cc_taskcollection import ( 

739 TaskCollection, 

740 task_when_created_sorter, 

741 ) # delayed import 

742 from camcops_server.cc_modules.cc_taskfilter import ( 

743 TaskFilter, 

744 ) # delayed import 

745 

746 # Which tasks? 

747 taskfilter = TaskFilter() 

748 taskfilter.task_types = [self.task_class.__tablename__] 

749 taskfilter.start_datetime = self.start_datetime # type: ignore[assignment] # noqa: E501 

750 taskfilter.end_datetime = self.end_datetime # type: ignore[assignment] 

751 taskfilter.complete_only = True 

752 

753 # Get tasks 

754 collection = TaskCollection( 

755 req=req, 

756 taskfilter=taskfilter, 

757 current_only=True, 

758 via_index=self.via_index, 

759 ) 

760 all_tasks = collection.all_tasks 

761 

762 # Get all distinct patients 

763 patients = set(t.patient for t in all_tasks) 

764 # log.debug("all_tasks: {}", all_tasks) 

765 # log.debug("patients: {}", [str(p) for p in patients]) 

766 

767 scoretypes = self.scoretypes(req) 

768 n_scoretypes = len(scoretypes) 

769 

770 # Sum first/last/progress scores by patient 

771 sum_first_by_score: list[Union[int, float]] = [0] * n_scoretypes 

772 sum_last_by_score: list[Union[int, float]] = [0] * n_scoretypes 

773 sum_improvement_by_score: list[Union[int, float]] = [0] * n_scoretypes 

774 n_first = 0 

775 n_last = 0 # also n_progress 

776 for patient in patients: 

777 # Find tasks for this patient 

778 patient_tasks = [t for t in all_tasks if t.patient == patient] 

779 assert patient_tasks, f"No tasks for patient {patient}" 

780 # log.debug("For patient {}, tasks: {}", patient, patient_tasks) 

781 # Find first and last task (last may be absent) 

782 patient_tasks.sort(key=task_when_created_sorter) 

783 first = patient_tasks[0] 

784 n_first += 1 

785 if len(patient_tasks) > 1: 

786 last = patient_tasks[-1] 

787 n_last += 1 

788 else: 

789 last = None 

790 

791 # Obtain first/last scores and progress 

792 for scoreidx, scoretype in enumerate(scoretypes): 

793 firstscore = scoretype.scorefunc(first) 

794 # Scores should not be None, because all tasks are complete. 

795 sum_first_by_score[scoreidx] += firstscore 

796 if last: 

797 lastscore = scoretype.scorefunc(last) 

798 sum_last_by_score[scoreidx] += lastscore 

799 improvement = scoretype.calculate_improvement( 

800 firstscore, lastscore 

801 ) 

802 sum_improvement_by_score[scoreidx] += improvement 

803 

804 # Format output 

805 column_names = [ 

806 _("Number of initial records"), 

807 _("Number of latest subsequent records"), 

808 ] 

809 row = [n_first, n_last] 

810 no_data = self.no_data_value() 

811 for scoreidx, scoretype in enumerate(scoretypes): 

812 # Calculations 

813 if n_first == 0: 

814 avg_first = no_data 

815 else: 

816 avg_first = sum_first_by_score[scoreidx] / n_first 

817 if n_last == 0: 

818 avg_last = no_data 

819 avg_improvement = no_data 

820 else: 

821 avg_last = sum_last_by_score[scoreidx] / n_last 

822 avg_improvement = sum_improvement_by_score[scoreidx] / n_last 

823 

824 # Columns and row data 

825 column_names += [ 

826 f"{scoretype.name} ({scoretype.minimum}–{scoretype.maximum}): " 

827 f"{_('First')}", 

828 f"{scoretype.name} ({scoretype.minimum}–{scoretype.maximum}): " 

829 f"{_('Latest')}", 

830 f"{scoretype.name}: {_('Improvement')}", 

831 ] 

832 row += [avg_first, avg_last, avg_improvement] 

833 

834 # Create and return report 

835 mainpage = self.get_spreadsheet_page( 

836 name=self.title(req), column_names=column_names, rows=[row] 

837 ) 

838 datepage = self.get_spreadsheet_page( 

839 name=_("Date filters"), 

840 column_names=[_("Start date"), _("End date")], 

841 rows=[[str(self.start_datetime), str(self.end_datetime)]], 

842 ) 

843 return [mainpage, datepage] 

844 

845 

846# ============================================================================= 

847# Report framework 

848# ============================================================================= 

849 

850 

851def get_all_report_classes(req: "CamcopsRequest") -> List[Type["Report"]]: 

852 """ 

853 Returns all :class:`Report` (sub)classes, i.e. all report types. 

854 """ 

855 classes = Report.all_subclasses() 

856 classes.sort(key=lambda c: c.title(req)) 

857 return classes 

858 

859 

860def get_report_instance(report_id: str) -> Optional[Report]: 

861 """ 

862 Creates an instance of a :class:`Report`, given its ID (name), or return 

863 ``None`` if the ID is invalid. 

864 """ 

865 if not report_id: 

866 return None 

867 for cls in Report.all_subclasses(): 

868 if cls.report_id == report_id: 

869 return cls() 

870 return None