Coverage for cc_modules/cc_export.py: 31%

476 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1# noinspection HttpUrlsUsage 

2""" 

3camcops_server/cc_modules/cc_export.py 

4 

5=============================================================================== 

6 

7 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

8 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27.. _ActiveMQ: https://activemq.apache.org/ 

28.. _AMQP: https://www.amqp.org/ 

29.. _APScheduler: https://apscheduler.readthedocs.io/ 

30.. _Celery: https://www.celeryproject.org/ 

31.. _Dramatiq: https://dramatiq.io/ 

32.. _RabbitMQ: https://www.rabbitmq.com/ 

33.. _Redis: https://redis.io/ 

34.. _ZeroMQ: https://zeromq.org/ 

35 

36**Export and research dump functions.** 

37 

38Export design: 

39 

40*WHICH RECORDS TO SEND?* 

41 

42The most powerful mechanism is not to have a sending queue (which would then 

43require careful multi-instance locking), but to have a "sent" log. That way: 

44 

45- A record needs sending if it's not in the sent log (for an appropriate 

46 recipient). 

47- You can add a new recipient and the system will know about the (new) 

48 backlog automatically. 

49- You can specify criteria, e.g. don't upload records before 1/1/2014, and 

50 modify that later, and it would catch up with the backlog. 

51- Successes and failures are logged in the same table. 

52- Multiple recipients are handled with ease. 

53- No need to alter database.pl code that receives from tablets. 

54- Can run with a simple cron job. 

55 

56*LOCKING* 

57 

58- Don't use database locking: 

59 https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you 

60- Locking via UNIX lockfiles: 

61 

62 - https://pypi.python.org/pypi/lockfile 

63 - http://pythonhosted.org/lockfile/ (which also works on Windows) 

64 

65 - On UNIX, ``lockfile`` uses ``LinkLockFile``: 

66 https://github.com/smontanaro/pylockfile/blob/master/lockfile/linklockfile.py 

67 

68*MESSAGE QUEUE AND BACKEND* 

69 

70Thoughts as of 2018-12-22. 

71 

72- See https://www.fullstackpython.com/task-queues.html. Also http://queues.io/; 

73 https://stackoverflow.com/questions/731233/activemq-or-rabbitmq-or-zeromq-or. 

74 

75- The "default" is Celery_, with ``celery beat`` for scheduling, via an 

76 AMQP_ broker like RabbitMQ_. 

77 

78 - Downside: no longer supported under Windows as of Celery 4. 

79 

80 - There are immediate bugs when running the demo code with Celery 4.2.1, 

81 fixed by setting the environment variable ``set 

82 FORKED_BY_MULTIPROCESSING=1`` before running the worker; see 

83 https://github.com/celery/celery/issues/4178 and 

84 https://github.com/celery/celery/pull/4078. 

85 

86 - Downside: backend is complex; e.g. Erlang dependency of RabbitMQ. 

87 

88 - Celery also supports Redis_, but Redis_ doesn't support Windows directly 

89 (except the Windows Subsystem for Linux in Windows 10+). 

90 

91- Another possibility is Dramatiq_ with APScheduler_. 

92 

93 - Of note, APScheduler_ can use an SQLAlchemy database table as its job 

94 store, which might be good. 

95 - Dramatiq_ uses RabbitMQ_ or Redis_. 

96 - Dramatiq_ 1.4.0 (2018-11-25) installs cleanly under Windows. Use ``pip 

97 install --upgrade "dramatic[rabbitmq, watch]"`` (i.e. with double quotse, 

98 not the single quotes it suggests, which don't work under Windows). 

99 - However, the basic example (https://dramatiq.io/guide.html) fails under 

100 Windows; when you fire up ``dramatic count_words`` (even with ``--processes 

101 1 --threads 1``) it crashes with an error from ``ForkingPickler`` in 

102 ``multiprocessing.reduction``, i.e. 

103 https://docs.python.org/3/library/multiprocessing.html#windows. It also 

104 emits a ``PermissionError: [WinError 5] Access is denied``. This is 

105 discussed a bit at https://github.com/Bogdanp/dramatiq/issues/75; 

106 https://github.com/Bogdanp/dramatiq/blob/master/docs/source/changelog.rst. 

107 The changelog suggests 1.4.0 should work, but it doesn't. 

108 

109- Worth some thought about ZeroMQ_, which is a very different sort of thing. 

110 Very cross-platform. Needs work to guard against message loss (i.e. messages 

111 are unreliable by default). Dynamic "special socket" style. 

112 

113- Possibly also ActiveMQ_. 

114 

115- OK; so speed is not critical but we want message reliability, for it to work 

116 under Windows, and decent Python bindings with job scheduling. 

117 

118 - OUT: Redis (not Windows easily), ZeroMQ (fast but not by default reliable), 

119 ActiveMQ (few Python frameworks?). 

120 - REMAINING for message handling: RabbitMQ. 

121 - Python options therefore: Celery (but Windows not officially supported from 

122 4+); Dramatiq (but Windows also not very well supported and seems a bit 

123 bleeding-edge). 

124 

125- This is looking like a mess from the Windows perspective. 

126 

127- An alternative is just to use the database, of course. 

128 

129 - https://softwareengineering.stackexchange.com/questions/351449/message-queue-database-vs-dedicated-mq 

130 - http://mikehadlow.blogspot.com/2012/04/database-as-queue-anti-pattern.html 

131 - https://blog.jooq.org/2014/09/26/using-your-rdbms-for-messaging-is-totally-ok/ 

132 - https://stackoverflow.com/questions/13005410/why-do-we-need-message-brokers-like-rabbitmq-over-a-database-like-postgresql 

133 - https://www.quora.com/What-is-the-best-practice-using-db-tables-or-message-queues-for-moderation-of-content-approved-by-humans 

134 

135- Let's take a step back and summarize the problem. 

136 

137 - Many web threads may upload tasks. This should trigger a prompt export for 

138 all push recipients. 

139 - Whichever way we schedule a backend task job, it should be as the 

140 combination of recipient, basetable, task PK. (That way, if one recipient 

141 fails, the others can proceed independently.) 

142 - Every job should check that it's not been completed already (in case of 

143 accidental job restarts), i.e. is idempotent as far as we can make it. 

144 - How should this interact with the non-push recipients? 

145 - We should use the same locking method for push and non-push recipients. 

146 - We should make the locking granular and use file locks -- for example, for 

147 each task/recipient combination (or each whole-database export for a given 

148 recipient). 

149 

150""" # noqa 

151 

152from contextlib import ExitStack 

153import json 

154import logging 

155import os 

156import sqlite3 

157import tempfile 

158from typing import ( 

159 Any, 

160 Container, 

161 Dict, 

162 List, 

163 Generator, 

164 Optional, 

165 Set, 

166 Tuple, 

167 Type, 

168 TYPE_CHECKING, 

169) 

170 

171from cardinal_pythonlib.classes import gen_all_subclasses 

172from cardinal_pythonlib.datetimefunc import ( 

173 format_datetime, 

174 get_now_localtz_pendulum, 

175 get_tz_local, 

176 get_tz_utc, 

177) 

178from cardinal_pythonlib.email.sendmail import CONTENT_TYPE_TEXT 

179from cardinal_pythonlib.fileops import relative_filename_within_dir 

180from cardinal_pythonlib.json.serialize import register_for_json 

181from cardinal_pythonlib.logs import BraceStyleAdapter 

182from cardinal_pythonlib.pyramid.responses import ( 

183 OdsResponse, 

184 SqliteBinaryResponse, 

185 TextAttachmentResponse, 

186 XlsxResponse, 

187 ZipResponse, 

188) 

189from cardinal_pythonlib.sizeformatter import bytes2human 

190from cardinal_pythonlib.sqlalchemy.session import get_safe_url_from_engine 

191import lockfile 

192from pendulum import DateTime as Pendulum, Duration 

193from pyramid.httpexceptions import HTTPBadRequest 

194from pyramid.renderers import render_to_response 

195from pyramid.response import Response 

196from sqlalchemy import insert 

197from sqlalchemy.engine import create_engine, Result 

198from sqlalchemy.orm import Session as SqlASession, sessionmaker 

199from sqlalchemy.sql.expression import text 

200from sqlalchemy.sql.schema import Column, MetaData, Table 

201from sqlalchemy.sql.sqltypes import Text 

202 

203from camcops_server.cc_modules.cc_audit import audit 

204from camcops_server.cc_modules.cc_constants import DateFormat, JSON_INDENT 

205from camcops_server.cc_modules.cc_dataclasses import SummarySchemaInfo 

206from camcops_server.cc_modules.cc_db import ( 

207 REMOVE_COLUMNS_FOR_SIMPLIFIED_SPREADSHEETS, 

208) 

209from camcops_server.cc_modules.cc_dump import copy_tasks_and_summaries 

210from camcops_server.cc_modules.cc_email import Email 

211from camcops_server.cc_modules.cc_exception import FhirExportException 

212from camcops_server.cc_modules.cc_exportmodels import ( 

213 ExportedTask, 

214 ExportRecipient, 

215 gen_tasks_having_exportedtasks, 

216 get_collection_for_export, 

217) 

218from camcops_server.cc_modules.cc_forms import UserDownloadDeleteForm 

219from camcops_server.cc_modules.cc_pyramid import Routes, ViewArg, ViewParam 

220from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions 

221from camcops_server.cc_modules.cc_sqlalchemy import sql_from_sqlite_database 

222from camcops_server.cc_modules.cc_task import SNOMED_TABLENAME, Task 

223from camcops_server.cc_modules.cc_spreadsheet import ( 

224 SpreadsheetCollection, 

225 SpreadsheetPage, 

226) 

227from camcops_server.cc_modules.celery import ( 

228 create_user_download, 

229 email_basic_dump, 

230 export_task_backend, 

231 jittered_delay_s, 

232) 

233 

234if TYPE_CHECKING: 

235 from pendulum import Interval 

236 

237 from camcops_server.cc_modules.cc_request import CamcopsRequest 

238 from camcops_server.cc_modules.cc_taskcollection import TaskCollection 

239 

240log = BraceStyleAdapter(logging.getLogger(__name__)) 

241 

242 

243# ============================================================================= 

244# Constants 

245# ============================================================================= 

246 

247INFOSCHEMA_PAGENAME = "_camcops_information_schema_columns" 

248SUMMARYSCHEMA_PAGENAME = "_camcops_column_explanations" 

249REMOVE_TABLES_FOR_SIMPLIFIED_SPREADSHEETS = {SNOMED_TABLENAME} 

250EMPTY_SET: Container[str] = set() 

251 

252 

253# ============================================================================= 

254# Export tasks from the back end 

255# ============================================================================= 

256 

257 

258def print_export_queue( 

259 req: "CamcopsRequest", 

260 recipient_names: List[str] = None, 

261 all_recipients: bool = False, 

262 via_index: bool = True, 

263 pretty: bool = False, 

264 debug_show_fhir: bool = False, 

265 debug_fhir_include_docs: bool = False, 

266) -> None: 

267 """ 

268 Shows tasks that would be exported. 

269 

270 - Called from the command line. 

271 

272 Args: 

273 req: 

274 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

275 recipient_names: 

276 list of export recipient names (as per the config file) 

277 all_recipients: 

278 use all recipients? 

279 via_index: 

280 use the task index (faster)? 

281 pretty: 

282 use ``str(task)`` not ``repr(task)`` (prettier, but slower because 

283 it has to query the patient) 

284 debug_show_fhir: 

285 Show FHIR output for each task, as JSON? 

286 debug_fhir_include_docs: 

287 (If debug_show_fhir.) Include document content? Large! 

288 """ 

289 recipients = req.get_export_recipients( 

290 recipient_names=recipient_names, 

291 all_recipients=all_recipients, 

292 save=False, 

293 ) 

294 if not recipients: 

295 log.warning("No export recipients") 

296 return 

297 for recipient in recipients: 

298 log.info("Tasks to be exported for recipient: {}", recipient) 

299 collection = get_collection_for_export( 

300 req, recipient, via_index=via_index 

301 ) 

302 for task in collection.gen_tasks_by_class(): 

303 print( 

304 f"{recipient.recipient_name}: " 

305 f"{str(task) if pretty else repr(task)}" 

306 ) 

307 if debug_show_fhir: 

308 try: 

309 bundle = task.get_fhir_bundle( 

310 req, 

311 recipient, 

312 skip_docs_if_other_content=not debug_fhir_include_docs, 

313 ) 

314 bundle_str = json.dumps( 

315 bundle.as_json(), indent=JSON_INDENT 

316 ) 

317 log.info("FHIR output as JSON:\n{}", bundle_str) 

318 except FhirExportException as e: 

319 log.info("Task has no non-document content:\n{}", e) 

320 

321 

322def export( 

323 req: "CamcopsRequest", 

324 recipient_names: List[str] = None, 

325 all_recipients: bool = False, 

326 via_index: bool = True, 

327 schedule_via_backend: bool = False, 

328) -> None: 

329 """ 

330 Exports all relevant tasks (pending incremental exports, or everything if 

331 applicable) for specified export recipients. 

332 

333 - Called from the command line, or from 

334 :func:`camcops_server.cc_modules.celery.export_to_recipient_backend`. 

335 - Calls :func:`export_whole_database` or :func:`export_tasks_individually`. 

336 

337 Args: 

338 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

339 recipient_names: list of export recipient names (as per the config 

340 file) 

341 all_recipients: use all recipients? 

342 via_index: use the task index (faster)? 

343 schedule_via_backend: schedule jobs via the backend instead? 

344 """ 

345 recipients = req.get_export_recipients( 

346 recipient_names=recipient_names, all_recipients=all_recipients 

347 ) 

348 if not recipients: 

349 log.warning("No export recipients") 

350 return 

351 

352 for recipient in recipients: 

353 log.info("Exporting to recipient: {}", recipient.recipient_name) 

354 if recipient.using_db(): 

355 if schedule_via_backend: 

356 raise NotImplementedError( 

357 "Not yet implemented: whole-database export via Celery " 

358 "backend" 

359 ) # todo: implement whole-database export via Celery backend # noqa 

360 else: 

361 export_whole_database(req, recipient, via_index=via_index) 

362 else: 

363 # Non-database recipient. 

364 export_tasks_individually( 

365 req, 

366 recipient, 

367 via_index=via_index, 

368 schedule_via_backend=schedule_via_backend, 

369 ) 

370 log.info("Finished exporting to {}", recipient.recipient_name) 

371 

372 

373def export_whole_database( 

374 req: "CamcopsRequest", recipient: ExportRecipient, via_index: bool = True 

375) -> None: 

376 """ 

377 Exports to a database. 

378 

379 - Called by :func:`export`. 

380 - Holds a recipient-specific "database" file lock in the process. 

381 

382 Args: 

383 req: 

384 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

385 recipient: 

386 an 

387 :class:`camcops_server.cc_modules.cc_exportmodels.ExportRecipient` 

388 via_index: 

389 use the task index (faster)? 

390 """ 

391 cfg = req.config 

392 lockfilename = cfg.get_export_lockfilename_recipient_db( 

393 recipient_name=recipient.recipient_name 

394 ) 

395 try: 

396 with lockfile.FileLock(lockfilename, timeout=0): # doesn't wait 

397 collection = get_collection_for_export( 

398 req, recipient, via_index=via_index 

399 ) 

400 dst_engine = create_engine( 

401 recipient.db_url, echo=recipient.db_echo 

402 ) 

403 log.info( 

404 "Exporting to database: {}", 

405 get_safe_url_from_engine(dst_engine), 

406 ) 

407 dst_session = sessionmaker(bind=dst_engine)() # type: SqlASession 

408 task_generator = gen_tasks_having_exportedtasks(collection) 

409 export_options = TaskExportOptions( 

410 include_blobs=recipient.db_include_blobs, 

411 db_patient_id_per_row=recipient.db_patient_id_per_row, 

412 db_make_all_tables_even_empty=True, 

413 db_include_summaries=recipient.db_add_summaries, 

414 ) 

415 copy_tasks_and_summaries( 

416 tasks=task_generator, 

417 dst_engine=dst_engine, 

418 dst_session=dst_session, 

419 export_options=export_options, 

420 req=req, 

421 ) 

422 dst_session.commit() 

423 except lockfile.AlreadyLocked: 

424 log.warning( 

425 "Export logfile {!r} already locked by another process; " 

426 "aborting (another process is doing this work)", 

427 lockfilename, 

428 ) 

429 # No need to retry by raising -- if someone else holds this lock, they 

430 # are doing the work that we wanted to do. 

431 

432 

433def export_tasks_individually( 

434 req: "CamcopsRequest", 

435 recipient: ExportRecipient, 

436 via_index: bool = True, 

437 schedule_via_backend: bool = False, 

438) -> None: 

439 """ 

440 Exports all necessary tasks for a recipient. 

441 

442 - Called by :func:`export`. 

443 - Calls :func:`export_task`, if ``schedule_via_backend`` is False. 

444 - Schedules :func:``camcops_server.cc_modules.celery.export_task_backend``, 

445 if ``schedule_via_backend`` is True, which calls :func:`export` in turn. 

446 

447 Args: 

448 req: 

449 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

450 recipient: 

451 an 

452 :class:`camcops_server.cc_modules.cc_exportmodels.ExportRecipient` 

453 via_index: 

454 use the task index (faster)? 

455 schedule_via_backend: 

456 schedule jobs via the backend instead? 

457 """ 

458 collection = get_collection_for_export(req, recipient, via_index=via_index) 

459 n_tasks = 0 

460 recipient_name = recipient.recipient_name 

461 if schedule_via_backend: 

462 for task_or_index in collection.gen_all_tasks_or_indexes(): 

463 if isinstance(task_or_index, Task): 

464 basetable = task_or_index.tablename 

465 task_pk = task_or_index.pk 

466 else: 

467 basetable = task_or_index.task_table_name 

468 task_pk = task_or_index.task_pk 

469 log.info( 

470 "Scheduling job to export task {}.{} to {}", 

471 basetable, 

472 task_pk, 

473 recipient_name, 

474 ) 

475 export_task_backend.delay( 

476 recipient_name=recipient_name, 

477 basetable=basetable, 

478 task_pk=task_pk, 

479 ) 

480 n_tasks += 1 

481 log.info( 

482 f"Scheduled {n_tasks} background task exports to " 

483 f"{recipient_name}" 

484 ) 

485 else: 

486 for task in collection.gen_tasks_by_class(): 

487 # Do NOT use this to check the working of export_task_backend(): 

488 # export_task_backend(recipient.recipient_name, task.tablename, task.pk) # noqa 

489 # ... it will deadlock at the database (because we're already 

490 # within a query of some sort, I presume) 

491 export_task(req, recipient, task) 

492 n_tasks += 1 

493 log.info(f"Exported {n_tasks} tasks to {recipient_name}") 

494 

495 

496def export_task( 

497 req: "CamcopsRequest", recipient: ExportRecipient, task: Task 

498) -> None: 

499 """ 

500 Exports a single task, checking that it remains valid to do so. 

501 

502 - Called by :func:`export_tasks_individually` directly, or called via 

503 :func:``camcops_server.cc_modules.celery.export_task_backend`` if 

504 :func:`export_tasks_individually` requested that. 

505 - Calls 

506 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTask.export`. 

507 - For FHIR, holds a recipient-specific "FHIR" file lock during export. 

508 - Always holds a recipient-and-task-specific file lock during export. 

509 

510 Args: 

511 req: 

512 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

513 recipient: 

514 an 

515 :class:`camcops_server.cc_modules.cc_exportmodels.ExportRecipient` 

516 task: 

517 a :class:`camcops_server.cc_modules.cc_task.Task` 

518 """ 

519 

520 # Double-check it's OK! Just in case, for example, an old backend task has 

521 # persisted, or someone's managed to get an iffy back-end request in some 

522 # other way. 

523 if not recipient.is_task_suitable(task): 

524 # Warning will already have been emitted (by is_task_suitable). 

525 return 

526 

527 cfg = req.config 

528 lockfilename = cfg.get_export_lockfilename_recipient_task( 

529 recipient_name=recipient.recipient_name, 

530 basetable=task.tablename, 

531 pk=task.pk, 

532 ) 

533 dbsession = req.dbsession 

534 with ExitStack() as stack: 

535 

536 if recipient.using_fhir() and not recipient.fhir_concurrent: 

537 # Some FHIR servers struggle with parallel processing, so we hold 

538 # a lock to serialize them. See notes in cc_fhir.py. 

539 # 

540 # We always use the order (1) FHIR lockfile, (2) task lockfile, to 

541 # avoid a deadlock. 

542 # 

543 # (Note that it is impossible that a non-FHIR task export grabs the 

544 # second of these without the first, because the second lockfile is 

545 # recipient-specific and the recipient details include the fact 

546 # that it is a FHIR recipient.) 

547 fhir_lockfilename = cfg.get_export_lockfilename_recipient_fhir( 

548 recipient_name=recipient.recipient_name 

549 ) 

550 try: 

551 stack.enter_context( 

552 lockfile.FileLock( 

553 fhir_lockfilename, timeout=jittered_delay_s() 

554 ) 

555 # waits for a while 

556 ) 

557 except lockfile.AlreadyLocked: 

558 log.warning( 

559 "Export logfile {!r} already locked by another process; " 

560 "will try again later", 

561 fhir_lockfilename, 

562 ) 

563 raise 

564 # We will reschedule via Celery; see "self.retry(...)" in 

565 # celery.py 

566 

567 try: 

568 stack.enter_context( 

569 lockfile.FileLock(lockfilename, timeout=0) # doesn't wait 

570 ) 

571 # We recheck the export status once we hold the lock, in case 

572 # multiple jobs are competing to export it. 

573 if ExportedTask.task_already_exported( 

574 dbsession=dbsession, 

575 recipient_name=recipient.recipient_name, 

576 basetable=task.tablename, 

577 task_pk=task.pk, 

578 ): 

579 log.info( 

580 "Task {!r} already exported to recipient {}; " "ignoring", 

581 task, 

582 recipient, 

583 ) 

584 # Not a warning; it's normal to see these because it allows the 

585 # client API to skip some checks for speed. 

586 return 

587 # OK; safe to export now. 

588 et = ExportedTask(recipient, task) 

589 dbsession.add(et) 

590 et.export(req) 

591 dbsession.commit() # so the ExportedTask is visible to others ASAP 

592 except lockfile.AlreadyLocked: 

593 log.warning( 

594 "Export logfile {!r} already locked by another process; " 

595 "aborting (another process is doing this work)", 

596 lockfilename, 

597 ) 

598 

599 

600# ============================================================================= 

601# Helpers for task collection export functions 

602# ============================================================================= 

603 

604 

605def gen_audited_tasks_for_task_class( 

606 collection: "TaskCollection", 

607 cls: Type[Task], 

608 audit_descriptions: List[str], 

609) -> Generator[Task, None, None]: 

610 """ 

611 Generates tasks from a collection, for a given task class, simultaneously 

612 adding to an audit description. Used for user-triggered downloads. 

613 

614 Args: 

615 collection: 

616 a 

617 :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

618 cls: 

619 the task class to generate 

620 audit_descriptions: 

621 list of strings to be modified 

622 

623 Yields: 

624 :class:`camcops_server.cc_modules.cc_task.Task` objects 

625 """ 

626 pklist = [] # type: List[int] 

627 for task in collection.tasks_for_task_class(cls): 

628 pklist.append(task.pk) 

629 yield task 

630 audit_descriptions.append( 

631 f"{cls.__tablename__}: " f"{','.join(str(pk) for pk in pklist)}" 

632 ) 

633 

634 

635def gen_audited_tasks_by_task_class( 

636 collection: "TaskCollection", audit_descriptions: List[str] 

637) -> Generator[Task, None, None]: 

638 """ 

639 Generates tasks from a collection, across task classes, simultaneously 

640 adding to an audit description. Used for user-triggered downloads. 

641 

642 Args: 

643 collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

644 audit_descriptions: list of strings to be modified 

645 

646 Yields: 

647 :class:`camcops_server.cc_modules.cc_task.Task` objects 

648 """ # noqa 

649 for cls in collection.task_classes(): 

650 for task in gen_audited_tasks_for_task_class( 

651 collection, cls, audit_descriptions 

652 ): 

653 yield task 

654 

655 

656def get_information_schema_query(req: "CamcopsRequest") -> Result: 

657 """ 

658 Returns an SQLAlchemy query object that fetches the 

659 INFORMATION_SCHEMA.COLUMNS information from our source database. 

660 

661 This is not sensitive; there is no data, just structure/comments. 

662 """ 

663 # Find our database name 

664 # https://stackoverflow.com/questions/53554458/sqlalchemy-get-database-name-from-engine 

665 dbname = req.engine.url.database 

666 # Query the information schema for our database. 

667 # https://docs.sqlalchemy.org/en/13/core/sqlelement.html#sqlalchemy.sql.expression.text # noqa 

668 query = text( 

669 """ 

670 SELECT * 

671 FROM information_schema.columns 

672 WHERE table_schema = :dbname 

673 """ 

674 ).bindparams(dbname=dbname) 

675 return req.dbsession.execute(query) 

676 

677 

678def get_information_schema_spreadsheet_page( 

679 req: "CamcopsRequest", page_name: str = INFOSCHEMA_PAGENAME 

680) -> SpreadsheetPage: 

681 """ 

682 Returns the server database's ``INFORMATION_SCHEMA.COLUMNS`` table as a 

683 :class:`camcops_server.cc_modules.cc_spreadsheet.SpreadsheetPage``. 

684 """ 

685 result = get_information_schema_query(req) 

686 return SpreadsheetPage.from_result(page_name, result) 

687 

688 

689def write_information_schema_to_dst( 

690 req: "CamcopsRequest", 

691 dst_session: SqlASession, 

692 dest_table_name: str = INFOSCHEMA_PAGENAME, 

693) -> None: 

694 """ 

695 Writes the server's information schema to a separate database session 

696 (which will be an SQLite database being created for download). 

697 

698 There must be no open transactions (i.e. please COMMIT before you call 

699 this function), since we need to create a table. 

700 """ 

701 # 1. Read the structure of INFORMATION_SCHEMA.COLUMNS itself. 

702 # https://stackoverflow.com/questions/21770829/sqlalchemy-copy-schema-and-data-of-subquery-to-another-database # noqa 

703 src_engine = req.engine 

704 dst_engine = dst_session.bind 

705 metadata = MetaData() 

706 table = Table( 

707 "columns", # table name; see also "schema" argument 

708 metadata, # "load with the destination metadata" 

709 # Override some specific column types by hand, or they'll fail as 

710 # SQLAlchemy fails to reflect the MySQL LONGTEXT type properly: 

711 Column("COLUMN_DEFAULT", Text), 

712 Column("COLUMN_KEY", Text), 

713 Column("COLUMN_TYPE", Text), 

714 Column("DATA_TYPE", Text), 

715 Column("GENERATION_EXPRESSION", Text), 

716 autoload_with=src_engine, # "read (reflect) structure from the source" 

717 schema="information_schema", # schema 

718 ) 

719 # 2. Write that structure to our new database. 

720 table.name = dest_table_name # create it with a different name 

721 table.schema = "" # we don't have a schema in the destination database 

722 table.create(dst_engine) # CREATE TABLE 

723 # 3. Fetch data. 

724 query = get_information_schema_query(req) 

725 # 4. Write the data. 

726 for row in query: 

727 dst_session.execute(insert(table).values(row)) 

728 # 5. COMMIT 

729 dst_session.commit() 

730 

731 

732# ============================================================================= 

733# Convert task collections to different export formats for user download 

734# ============================================================================= 

735 

736 

737@register_for_json 

738class DownloadOptions(object): 

739 """ 

740 Represents options for the process of the user downloading tasks. 

741 """ 

742 

743 DELIVERY_MODES = [ViewArg.DOWNLOAD, ViewArg.EMAIL, ViewArg.IMMEDIATELY] 

744 

745 def __init__( 

746 self, 

747 user_id: int, 

748 viewtype: str, 

749 delivery_mode: str, 

750 spreadsheet_simplified: bool = False, 

751 spreadsheet_sort_by_heading: bool = False, 

752 db_include_blobs: bool = False, 

753 db_patient_id_per_row: bool = False, 

754 include_information_schema_columns: bool = True, 

755 include_summary_schema: bool = True, 

756 ) -> None: 

757 """ 

758 Args: 

759 user_id: 

760 ID of the user creating the request (may be needed to pass to 

761 the back-end) 

762 viewtype: 

763 file format for receiving data (e.g. XLSX, SQLite) 

764 delivery_mode: 

765 method of delivery (e.g. immediate, e-mail) 

766 spreadsheet_sort_by_heading: 

767 (For spreadsheets.) 

768 Sort columns within each page by heading name? 

769 db_include_blobs: 

770 (For database downloads.) 

771 Include BLOBs? 

772 db_patient_id_per_row: 

773 (For database downloads.) 

774 Denormalize by include the patient ID in all rows of 

775 patient-related tables? 

776 include_information_schema_columns: 

777 Include descriptions of the database source columns? 

778 include_summary_schema: 

779 Include descriptions of summary columns and other columns in 

780 output spreadsheets? 

781 """ 

782 assert delivery_mode in self.DELIVERY_MODES 

783 self.user_id = user_id 

784 self.viewtype = viewtype 

785 self.delivery_mode = delivery_mode 

786 self.spreadsheet_simplified = spreadsheet_simplified 

787 self.spreadsheet_sort_by_heading = spreadsheet_sort_by_heading 

788 self.db_include_blobs = db_include_blobs 

789 self.db_patient_id_per_row = db_patient_id_per_row 

790 self.include_information_schema_columns = ( 

791 include_information_schema_columns 

792 ) 

793 self.include_summary_schema = include_summary_schema 

794 

795 

796class TaskCollectionExporter(object): 

797 """ 

798 Class to provide tasks for user download. 

799 """ 

800 

801 def __init__( 

802 self, 

803 req: "CamcopsRequest", 

804 collection: "TaskCollection", 

805 options: DownloadOptions, 

806 ): 

807 """ 

808 Args: 

809 req: 

810 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

811 collection: 

812 a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

813 options: 

814 :class:`DownloadOptions` governing the download 

815 """ # noqa 

816 self.req = req 

817 self.collection = collection 

818 self.options = options 

819 

820 @property 

821 def viewtype(self) -> str: 

822 raise NotImplementedError("Exporter needs to implement 'viewtype'") 

823 

824 @property 

825 def file_extension(self) -> str: 

826 raise NotImplementedError( 

827 "Exporter needs to implement 'file_extension'" 

828 ) 

829 

830 def get_filename(self) -> str: 

831 """ 

832 Returns the filename for the download. 

833 """ 

834 timestamp = format_datetime(self.req.now, DateFormat.FILENAME) 

835 return f"CamCOPS_dump_{timestamp}.{self.file_extension}" 

836 

837 def immediate_response(self, req: "CamcopsRequest") -> Response: 

838 """ 

839 Returns either a :class:`Response` with the data, or a 

840 :class:`Response` saying how the user will obtain their data later. 

841 

842 Args: 

843 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

844 """ 

845 if self.options.delivery_mode == ViewArg.EMAIL: 

846 self.schedule_email() 

847 return render_to_response( 

848 "email_scheduled.mako", dict(), request=req 

849 ) 

850 elif self.options.delivery_mode == ViewArg.DOWNLOAD: 

851 self.schedule_download() 

852 return render_to_response( 

853 "download_scheduled.mako", dict(), request=req 

854 ) 

855 else: # ViewArg.IMMEDIATELY 

856 return self.download_now() 

857 

858 def download_now(self) -> Response: 

859 """ 

860 Download the data dump in the selected format 

861 """ 

862 filename, body = self.to_file() 

863 return self.get_data_response(body=body, filename=filename) 

864 

865 def schedule_email(self) -> None: 

866 """ 

867 Schedule the export asynchronously and e-mail the logged in user 

868 when done 

869 """ 

870 email_basic_dump.delay(self.collection, self.options) 

871 

872 def send_by_email(self) -> None: 

873 """ 

874 Send the data dump by e-mail to the logged in user 

875 """ 

876 _ = self.req.gettext 

877 config = self.req.config 

878 

879 filename, body = self.to_file() 

880 email_to = self.req.user.email 

881 email = Email( 

882 # date: automatic 

883 from_addr=config.email_from, 

884 to=email_to, 

885 subject=_("CamCOPS research data dump"), 

886 body=_("The research data dump you requested is attached."), 

887 content_type=CONTENT_TYPE_TEXT, 

888 charset="utf8", 

889 attachments_binary=[(filename, body)], 

890 ) 

891 email.send( 

892 host=config.email_host, 

893 username=config.email_host_username, 

894 password=config.email_host_password, 

895 port=config.email_port, 

896 use_tls=config.email_use_tls, 

897 ) 

898 

899 if email.sent: 

900 log.info(f"Research dump emailed to {email_to}") 

901 else: 

902 log.error(f"Failed to email research dump to {email_to}") 

903 

904 def schedule_download(self) -> None: 

905 """ 

906 Schedule a background export to a file that the user can download 

907 later. 

908 """ 

909 create_user_download.delay(self.collection, self.options) 

910 

911 def create_user_download_and_email(self) -> None: 

912 """ 

913 Creates a user download, and e-mails the user to let them know. 

914 """ 

915 _ = self.req.gettext 

916 config = self.req.config 

917 

918 download_dir = self.req.user_download_dir 

919 space = self.req.user_download_bytes_available 

920 filename, contents = self.to_file() 

921 size = len(contents) 

922 

923 if size > space: 

924 # Not enough space 

925 total_permitted = self.req.user_download_bytes_permitted 

926 msg = _( 

927 "You do not have enough space to create this download. " 

928 "You are allowed {total_permitted} bytes and you are have " 

929 "{space} bytes free. This download would need {size} bytes." 

930 ).format(total_permitted=total_permitted, space=space, size=size) 

931 else: 

932 # Create file 

933 fullpath = os.path.join(download_dir, filename) 

934 try: 

935 with open(fullpath, "wb") as f: 

936 f.write(contents) 

937 # Success 

938 log.info(f"Created user download: {fullpath}") 

939 msg = ( 

940 _( 

941 "The research data dump you requested is ready to be " 

942 "downloaded. You will find it in your download area. " 

943 "It is called %s" 

944 ) 

945 % filename 

946 ) 

947 except Exception as e: 

948 # Some other error 

949 msg = _( 

950 "Failed to create file {filename}. Error was: {message}" 

951 ).format(filename=filename, message=e) 

952 

953 # E-mail the user, if they have an e-mail address 

954 email_to = self.req.user.email 

955 if email_to: 

956 email = Email( 

957 # date: automatic 

958 from_addr=config.email_from, 

959 to=email_to, 

960 subject=_("CamCOPS research data dump"), 

961 body=msg, 

962 content_type=CONTENT_TYPE_TEXT, 

963 charset="utf8", 

964 ) 

965 email.send( 

966 host=config.email_host, 

967 username=config.email_host_username, 

968 password=config.email_host_password, 

969 port=config.email_port, 

970 use_tls=config.email_use_tls, 

971 ) 

972 

973 def get_data_response(self, body: bytes, filename: str) -> Response: 

974 raise NotImplementedError("Exporter needs to implement 'get_response'") 

975 

976 def to_file(self) -> Tuple[str, bytes]: 

977 """ 

978 Returns the tuple ``filename, file_contents``. 

979 """ 

980 return self.get_filename(), self.get_file_body() 

981 

982 def get_file_body(self) -> bytes: 

983 """ 

984 Returns binary data to be stored as a file. 

985 """ 

986 raise NotImplementedError( 

987 "Exporter needs to implement 'get_file_body'" 

988 ) 

989 

990 def get_spreadsheet_collection(self) -> SpreadsheetCollection: 

991 """ 

992 Converts the collection of tasks to a collection of spreadsheet-style 

993 data. Also audits the request as a basic data dump. 

994 

995 Returns: 

996 a 

997 :class:`camcops_server.cc_modules.cc_spreadsheet.SpreadsheetCollection` 

998 object 

999 """ 

1000 audit_descriptions = [] # type: List[str] 

1001 options = self.options 

1002 if options.spreadsheet_simplified: 

1003 summary_exclusion_tables = ( 

1004 REMOVE_TABLES_FOR_SIMPLIFIED_SPREADSHEETS 

1005 ) 

1006 summary_exclusion_columns = ( 

1007 REMOVE_COLUMNS_FOR_SIMPLIFIED_SPREADSHEETS 

1008 ) 

1009 else: 

1010 summary_exclusion_tables = EMPTY_SET # type: ignore[assignment] 

1011 summary_exclusion_columns = EMPTY_SET # type: ignore[assignment] 

1012 # Task may return >1 sheet for output (e.g. for subtables). 

1013 coll = SpreadsheetCollection() 

1014 

1015 # Iterate through tasks, creating the spreadsheet collection 

1016 schema_elements = set() # type: Set[SummarySchemaInfo] 

1017 for cls in self.collection.task_classes(): 

1018 schema_done = False 

1019 for task in gen_audited_tasks_for_task_class( 

1020 self.collection, cls, audit_descriptions 

1021 ): 

1022 # Task data 

1023 coll.add_pages(task.get_spreadsheet_pages(self.req)) 

1024 if not schema_done and options.include_summary_schema: 

1025 # Schema (including summary explanations) 

1026 schema_elements |= task.get_spreadsheet_schema_elements( 

1027 self.req 

1028 ) 

1029 # We just need this from one task instance. 

1030 schema_done = True 

1031 

1032 if options.include_summary_schema: 

1033 coll.add_page( 

1034 SpreadsheetPage( 

1035 name=SUMMARYSCHEMA_PAGENAME, 

1036 rows=[ 

1037 si.as_dict 

1038 for si in sorted(schema_elements) 

1039 if si.column_name not in summary_exclusion_columns 

1040 and si.table_name not in summary_exclusion_tables 

1041 ], 

1042 ) 

1043 ) 

1044 

1045 if options.include_information_schema_columns: 

1046 # Source database information schema 

1047 coll.add_page(get_information_schema_spreadsheet_page(self.req)) 

1048 

1049 # Simplify 

1050 if options.spreadsheet_simplified: 

1051 coll.delete_pages(summary_exclusion_tables) 

1052 coll.delete_columns(summary_exclusion_columns) 

1053 

1054 # Sort 

1055 coll.sort_pages() 

1056 if options.spreadsheet_sort_by_heading: 

1057 coll.sort_headings_within_all_pages() 

1058 

1059 # Audit 

1060 audit(self.req, f"Basic dump: {'; '.join(audit_descriptions)}") 

1061 

1062 return coll 

1063 

1064 

1065class OdsExporter(TaskCollectionExporter): 

1066 """ 

1067 Converts a set of tasks to an OpenOffice ODS file. 

1068 """ 

1069 

1070 file_extension = "ods" 

1071 viewtype = ViewArg.ODS 

1072 

1073 def get_file_body(self) -> bytes: 

1074 return self.get_spreadsheet_collection().as_ods() 

1075 

1076 def get_data_response(self, body: bytes, filename: str) -> Response: 

1077 return OdsResponse(body=body, filename=filename) 

1078 

1079 

1080class RExporter(TaskCollectionExporter): 

1081 """ 

1082 Converts a set of tasks to an R script. 

1083 """ 

1084 

1085 file_extension = "R" 

1086 viewtype = ViewArg.R 

1087 

1088 def __init__(self, *args: Any, **kwargs: Any) -> None: 

1089 super().__init__(*args, **kwargs) 

1090 self.encoding = "utf-8" 

1091 

1092 def get_file_body(self) -> bytes: 

1093 return self.get_r_script().encode(self.encoding) 

1094 

1095 def get_r_script(self) -> str: 

1096 return self.get_spreadsheet_collection().as_r() 

1097 

1098 def get_data_response(self, body: bytes, filename: str) -> Response: 

1099 filename = self.get_filename() 

1100 r_script = self.get_r_script() 

1101 return TextAttachmentResponse(body=r_script, filename=filename) 

1102 

1103 

1104class TsvZipExporter(TaskCollectionExporter): 

1105 """ 

1106 Converts a set of tasks to a set of TSV (tab-separated value) file, (one 

1107 per table) in a ZIP file. 

1108 """ 

1109 

1110 file_extension = "zip" 

1111 viewtype = ViewArg.TSV_ZIP 

1112 

1113 def get_file_body(self) -> bytes: 

1114 return self.get_spreadsheet_collection().as_zip() 

1115 

1116 def get_data_response(self, body: bytes, filename: str) -> Response: 

1117 return ZipResponse(body=body, filename=filename) 

1118 

1119 

1120class XlsxExporter(TaskCollectionExporter): 

1121 """ 

1122 Converts a set of tasks to an Excel XLSX file. 

1123 """ 

1124 

1125 file_extension = "xlsx" 

1126 viewtype = ViewArg.XLSX 

1127 

1128 def get_file_body(self) -> bytes: 

1129 return self.get_spreadsheet_collection().as_xlsx() 

1130 

1131 def get_data_response(self, body: bytes, filename: str) -> Response: 

1132 return XlsxResponse(body=body, filename=filename) 

1133 

1134 

1135class SqliteExporter(TaskCollectionExporter): 

1136 """ 

1137 Converts a set of tasks to an SQLite binary file. 

1138 """ 

1139 

1140 file_extension = "sqlite" 

1141 viewtype = ViewArg.SQLITE 

1142 db_basename = "temp.sqlite3" 

1143 

1144 def get_export_options(self) -> TaskExportOptions: 

1145 return TaskExportOptions( 

1146 include_blobs=self.options.db_include_blobs, 

1147 db_include_summaries=True, 

1148 db_make_all_tables_even_empty=True, # debatable, but more consistent! # noqa 

1149 db_patient_id_per_row=self.options.db_patient_id_per_row, 

1150 ) 

1151 

1152 def get_sqlite_data_as_text(self) -> str: 

1153 """ 

1154 Returns data as SQL text to create it. 

1155 """ 

1156 with tempfile.TemporaryDirectory() as tmpdirname: 

1157 db_filename = os.path.join(tmpdirname, self.db_basename) 

1158 self._write_to_sqlite_file(db_filename) 

1159 connection = sqlite3.connect( 

1160 db_filename 

1161 ) # type: sqlite3.Connection 

1162 sql_text = sql_from_sqlite_database(connection) 

1163 connection.close() 

1164 return sql_text 

1165 

1166 def get_sqlite_data_as_bytes(self) -> bytes: 

1167 """ 

1168 Returns data as a binary SQLite database. 

1169 """ 

1170 with tempfile.TemporaryDirectory() as tmpdirname: 

1171 db_filename = os.path.join(tmpdirname, self.db_basename) 

1172 self._write_to_sqlite_file(db_filename) 

1173 with open(db_filename, "rb") as f: 

1174 binary_contents = f.read() 

1175 return binary_contents 

1176 

1177 def _write_to_sqlite_file(self, db_filename: str) -> None: 

1178 # --------------------------------------------------------------------- 

1179 # Create memory file, dumper, and engine 

1180 # --------------------------------------------------------------------- 

1181 

1182 # This approach failed: 

1183 # 

1184 # memfile = io.StringIO() 

1185 # 

1186 # def dump(querysql, *multiparams, **params): 

1187 # compsql = querysql.compile(dialect=engine.dialect) 

1188 # memfile.write("{};\n".format(compsql)) 

1189 # 

1190 # engine = create_engine('{dialect}://'.format(dialect=dialect_name), 

1191 # strategy='mock', executor=dump) 

1192 # dst_session = sessionmaker(bind=engine)() # type: SqlASession 

1193 # 

1194 # ... you get the error 

1195 # AttributeError: 'MockConnection' object has no attribute 'begin' 

1196 # ... which is fair enough. 

1197 # 

1198 # Next best thing: SQLite database. 

1199 # Two ways to deal with it: 

1200 # (a) duplicate our C++ dump code (which itself duplicate the SQLite 

1201 # command-line executable's dump facility), then create the 

1202 # database, dump it to a string, serve the string; or 

1203 # (b) offer the binary SQLite file. 

1204 # Or... (c) both. 

1205 # Aha! pymysqlite.iterdump does this for us. 

1206 # 

1207 # If we create an in-memory database using create_engine('sqlite://'), 

1208 # can we get the binary contents out? Don't think so. 

1209 # 

1210 # So we should first create a temporary on-disk file, then use that. 

1211 

1212 # --------------------------------------------------------------------- 

1213 # Make temporary file (one whose filename we can know). 

1214 # --------------------------------------------------------------------- 

1215 # We use tempfile.mkstemp() for security, or NamedTemporaryFile, 

1216 # which is a bit easier. However, you can't necessarily open the file 

1217 # again under all OSs, so that's no good. The final option is 

1218 # TemporaryDirectory, which is secure and convenient. 

1219 # 

1220 # https://docs.python.org/3/library/tempfile.html 

1221 # https://security.openstack.org/guidelines/dg_using-temporary-files-securely.html # noqa 

1222 # https://stackoverflow.com/questions/3924117/how-to-use-tempfile-namedtemporaryfile-in-python # noqa 

1223 # --------------------------------------------------------------------- 

1224 # Make SQLAlchemy session 

1225 # --------------------------------------------------------------------- 

1226 url = "sqlite:///" + db_filename 

1227 engine = create_engine(url, echo=False) 

1228 dst_session: SqlASession = sessionmaker(bind=engine)() 

1229 # --------------------------------------------------------------------- 

1230 # Iterate through tasks, creating tables as we need them. 

1231 # --------------------------------------------------------------------- 

1232 audit_descriptions = [] # type: List[str] 

1233 task_generator = gen_audited_tasks_by_task_class( 

1234 self.collection, audit_descriptions 

1235 ) 

1236 # --------------------------------------------------------------------- 

1237 # Next bit very tricky. We're trying to achieve several things: 

1238 # - a copy of part of the database structure 

1239 # - a copy of part of the data, with relationships intact 

1240 # - nothing sensitive (e.g. full User records) going through 

1241 # - adding new columns for Task objects offering summary values 

1242 # - Must treat tasks all together, because otherwise we will insert 

1243 # duplicate dependency objects like Group objects. 

1244 # --------------------------------------------------------------------- 

1245 copy_tasks_and_summaries( 

1246 tasks=task_generator, 

1247 dst_engine=engine, 

1248 dst_session=dst_session, 

1249 export_options=self.get_export_options(), 

1250 req=self.req, 

1251 ) 

1252 dst_session.commit() 

1253 if self.options.include_information_schema_columns: 

1254 # Must have committed before we do this: 

1255 write_information_schema_to_dst(self.req, dst_session) 

1256 # --------------------------------------------------------------------- 

1257 # Audit 

1258 # --------------------------------------------------------------------- 

1259 audit(self.req, f"SQL dump: {'; '.join(audit_descriptions)}") 

1260 

1261 def get_file_body(self) -> bytes: 

1262 return self.get_sqlite_data_as_bytes() 

1263 

1264 def get_data_response(self, body: bytes, filename: str) -> Response: 

1265 return SqliteBinaryResponse(body=body, filename=filename) 

1266 

1267 

1268class SqlExporter(SqliteExporter): 

1269 """ 

1270 Converts a set of tasks to the textual SQL needed to create an SQLite file. 

1271 """ 

1272 

1273 file_extension = "sql" 

1274 viewtype = ViewArg.SQL 

1275 

1276 def __init__(self, *args: Any, **kwargs: Any) -> None: 

1277 super().__init__(*args, **kwargs) 

1278 self.encoding = "utf-8" 

1279 

1280 def get_file_body(self) -> bytes: 

1281 return self.get_sql().encode(self.encoding) 

1282 

1283 def get_sql(self) -> str: 

1284 """ 

1285 Returns SQL text representing the SQLite database. 

1286 """ 

1287 return self.get_sqlite_data_as_text() 

1288 

1289 def download_now(self) -> Response: 

1290 """ 

1291 Download the data dump in the selected format 

1292 """ 

1293 filename = self.get_filename() 

1294 sql_text = self.get_sql() 

1295 return TextAttachmentResponse(body=sql_text, filename=filename) 

1296 

1297 def get_data_response(self, body: bytes, filename: str) -> Response: 

1298 """ 

1299 Unused. 

1300 """ 

1301 pass 

1302 

1303 

1304# Create mapping from "viewtype" to class. 

1305# noinspection PyTypeChecker 

1306DOWNLOADER_CLASSES = {} # type: Dict[str, Type[TaskCollectionExporter]] 

1307for _cls in gen_all_subclasses( 

1308 TaskCollectionExporter 

1309): # type: Type[TaskCollectionExporter] 

1310 # noinspection PyTypeChecker 

1311 DOWNLOADER_CLASSES[_cls.viewtype] = _cls # type: ignore[index] 

1312 

1313 

1314def make_exporter( 

1315 req: "CamcopsRequest", 

1316 collection: "TaskCollection", 

1317 options: DownloadOptions, 

1318) -> TaskCollectionExporter: 

1319 """ 

1320 

1321 Args: 

1322 req: 

1323 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1324 collection: 

1325 a 

1326 :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

1327 options: 

1328 :class:`camcops_server.cc_modules.cc_export.DownloadOptions` 

1329 governing the download 

1330 

1331 Returns: 

1332 a :class:`BasicTaskCollectionExporter` 

1333 

1334 Raises: 

1335 :exc:`HTTPBadRequest` if the arguments are bad 

1336 """ 

1337 _ = req.gettext 

1338 if options.delivery_mode not in DownloadOptions.DELIVERY_MODES: 

1339 raise HTTPBadRequest( 

1340 f"{_('Bad delivery mode:')} {options.delivery_mode!r} " 

1341 f"({_('permissible:')} " 

1342 f"{DownloadOptions.DELIVERY_MODES!r})" 

1343 ) 

1344 try: 

1345 downloader_class = DOWNLOADER_CLASSES[options.viewtype] 

1346 except KeyError: 

1347 raise HTTPBadRequest( 

1348 f"{_('Bad output type:')} {options.viewtype!r} " 

1349 f"({_('permissible:')} {DOWNLOADER_CLASSES.keys()!r})" 

1350 ) 

1351 return downloader_class(req=req, collection=collection, options=options) 

1352 

1353 

1354# ============================================================================= 

1355# Represent files for users to download 

1356# ============================================================================= 

1357 

1358 

1359class UserDownloadFile(object): 

1360 """ 

1361 Represents a file that has been generated for the user to download. 

1362 

1363 Test code: 

1364 

1365 .. code-block:: python 

1366 

1367 from camcops_server.cc_modules.cc_export import UserDownloadFile 

1368 x = UserDownloadFile("/etc/hosts") 

1369 

1370 print(x.when_last_modified) # should match output of: ls -l /etc/hosts 

1371 

1372 many = UserDownloadFile.from_directory_scan("/etc") 

1373 

1374 """ 

1375 

1376 statinfo: Optional[os.stat_result] 

1377 

1378 def __init__( 

1379 self, 

1380 filename: str, 

1381 directory: str = "", 

1382 permitted_lifespan_min: float = 0, 

1383 req: "CamcopsRequest" = None, 

1384 ) -> None: 

1385 """ 

1386 Args: 

1387 filename: 

1388 Filename, either absolute, or if ``directory`` is specified, 

1389 relative to ``directory``. 

1390 directory: 

1391 Directory. If specified, ``filename`` must be within it. 

1392 req: 

1393 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1394 

1395 Notes: 

1396 

1397 - The Unix ``ls`` command shows timestamps in the current timezone. 

1398 Try ``TZ=utc ls -l <filename>`` or ``TZ="America/New_York" ls -l 

1399 <filename>`` to see this. 

1400 - The underlying timestamp is the time (in seconds) since the Unix 

1401 "epoch", which is 00:00:00 UTC on 1 Jan 1970 

1402 (https://en.wikipedia.org/wiki/Unix_time). 

1403 """ 

1404 self.filename = filename 

1405 self.permitted_lifespan_min = permitted_lifespan_min 

1406 self.req = req 

1407 

1408 self.basename = os.path.basename(filename) 

1409 _, self.extension = os.path.splitext(filename) 

1410 if directory: 

1411 # filename must be within the directory specified 

1412 self.directory = os.path.abspath(directory) 

1413 candidate_path = os.path.abspath( 

1414 os.path.join(self.directory, filename) 

1415 ) 

1416 if os.path.commonpath([directory, candidate_path]) != directory: 

1417 # Filename is not within directory. 

1418 # This is dodgy -- someone may have passed a filename like 

1419 # "../../dangerous_dir/unsafe_content.txt" 

1420 self.fullpath = "" 

1421 # ... ensures that "exists" will be False. 

1422 else: 

1423 self.fullpath = candidate_path 

1424 else: 

1425 # filename is treated as an absolute path 

1426 self.directory = "" 

1427 self.fullpath = filename 

1428 

1429 try: 

1430 self.statinfo = os.stat(self.fullpath) 

1431 self.exists = True 

1432 except FileNotFoundError: 

1433 self.statinfo = None 

1434 self.exists = False 

1435 

1436 # ------------------------------------------------------------------------- 

1437 # Size 

1438 # ------------------------------------------------------------------------- 

1439 

1440 @property 

1441 def size(self) -> Optional[int]: 

1442 """ 

1443 Size of the file, in bytes. Returns ``None`` if the file does not 

1444 exist. 

1445 """ 

1446 return self.statinfo.st_size if self.exists else None 

1447 

1448 @property 

1449 def size_str(self) -> str: 

1450 """ 

1451 Returns a pretty-format string describing the file's size. 

1452 """ 

1453 size_bytes = self.size 

1454 if size_bytes is None: 

1455 return "" 

1456 return bytes2human(size_bytes) 

1457 

1458 # ------------------------------------------------------------------------- 

1459 # Timing 

1460 # ------------------------------------------------------------------------- 

1461 

1462 @property 

1463 def when_last_modified(self) -> Optional[Pendulum]: 

1464 """ 

1465 Returns the file's modification time, or ``None`` if it doesn't exist. 

1466 

1467 (Creation time is harder! See 

1468 https://stackoverflow.com/questions/237079/how-to-get-file-creation-modification-date-times-in-python.) 

1469 """ 

1470 if not self.exists: 

1471 return None 

1472 # noinspection PyTypeChecker 

1473 creation = Pendulum.fromtimestamp( 

1474 self.statinfo.st_mtime, tz=get_tz_utc() 

1475 ) # type: Pendulum 

1476 # ... gives the correct time in the UTC timezone 

1477 # ... note that utcfromtimestamp() gives a time without a timezone, 

1478 # which is unhelpful! 

1479 # We would like this to display in the current timezone: 

1480 return creation.in_timezone(get_tz_local()) 

1481 

1482 @property 

1483 def when_last_modified_str(self) -> str: 

1484 """ 

1485 Returns a formatted string with the file's modification time. 

1486 """ 

1487 w = self.when_last_modified 

1488 if not w: 

1489 return "" 

1490 return format_datetime(w, DateFormat.ISO8601_HUMANIZED_TO_SECONDS) 

1491 

1492 @property 

1493 def time_left(self) -> Optional[Duration]: 

1494 """ 

1495 Returns the amount of time that this file has left to live before 

1496 the server will delete it. Returns ``None`` if the file does not exist. 

1497 """ 

1498 if not self.exists: 

1499 return None 

1500 now = get_now_localtz_pendulum() 

1501 death = self.when_last_modified + Duration( 

1502 minutes=self.permitted_lifespan_min 

1503 ) 

1504 remaining = death - now # type: Interval 

1505 # Note that Interval is a subclass of Duration, but its __str__() 

1506 # method is different. Duration maps __str__() to in_words(), but 

1507 # Interval maps __str__() to __repr__(). 

1508 return remaining 

1509 

1510 @property 

1511 def time_left_str(self) -> str: 

1512 """ 

1513 A string version of :meth:`time_left`. 

1514 """ 

1515 t = self.time_left 

1516 if not t: 

1517 return "" 

1518 return t.in_words() # Duration and Period do nice formatting 

1519 

1520 def older_than(self, when: Pendulum) -> bool: 

1521 """ 

1522 Was the file created before the specified time? 

1523 """ 

1524 m = self.when_last_modified 

1525 if not m: 

1526 return False 

1527 return m < when 

1528 

1529 # ------------------------------------------------------------------------- 

1530 # Deletion 

1531 # ------------------------------------------------------------------------- 

1532 

1533 @property 

1534 def delete_form(self) -> str: 

1535 """ 

1536 Returns HTML for a form to delete this file. 

1537 """ 

1538 if not self.req: 

1539 return "" 

1540 dest_url = self.req.route_url(Routes.DELETE_FILE) 

1541 form = UserDownloadDeleteForm(request=self.req, action=dest_url) 

1542 appstruct = {ViewParam.FILENAME: self.filename} 

1543 rendered_form = form.render(appstruct) 

1544 return rendered_form 

1545 

1546 def delete(self) -> None: 

1547 """ 

1548 Deletes the file. Does not raise an exception if the file does not 

1549 exist. 

1550 """ 

1551 try: 

1552 os.remove(self.fullpath) 

1553 log.info(f"Deleted file: {self.fullpath}") 

1554 except OSError: 

1555 pass 

1556 

1557 # ------------------------------------------------------------------------- 

1558 # Downloading 

1559 # ------------------------------------------------------------------------- 

1560 

1561 @property 

1562 def download_url(self) -> str: 

1563 """ 

1564 Returns a URL to download this file. 

1565 """ 

1566 if not self.req: 

1567 return "" 

1568 querydict = {ViewParam.FILENAME: self.filename} 

1569 return self.req.route_url(Routes.DOWNLOAD_FILE, _query=querydict) 

1570 

1571 @property 

1572 def contents(self) -> Optional[bytes]: 

1573 """ 

1574 The file contents. May raise :exc:`OSError` if the read fails. 

1575 """ 

1576 if not self.exists: 

1577 return None 

1578 with open(self.fullpath, "rb") as f: 

1579 return f.read() 

1580 

1581 # ------------------------------------------------------------------------- 

1582 # Bulk creation 

1583 # ------------------------------------------------------------------------- 

1584 

1585 @classmethod 

1586 def from_directory_scan( 

1587 cls, 

1588 directory: str, 

1589 permitted_lifespan_min: float = 0, 

1590 req: "CamcopsRequest" = None, 

1591 ) -> List["UserDownloadFile"]: 

1592 """ 

1593 Scans the directory and returns a list of :class:`UserDownloadFile` 

1594 objects, one for each file in the directory. 

1595 

1596 For each object, ``directory`` is the root directory (our parameter 

1597 here), and ``filename`` is the filename RELATIVE to that. 

1598 

1599 Args: 

1600 directory: directory to scan 

1601 permitted_lifespan_min: lifespan for each file 

1602 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1603 """ 

1604 results = [] # type: List[UserDownloadFile] 

1605 # Imagine directory == "/etc": 

1606 for root, dirs, files in os.walk(directory): 

1607 # ... then root might at times be "/etc/apache2" 

1608 for f in files: 

1609 fullpath = os.path.join(root, f) 

1610 relative_filename = relative_filename_within_dir( 

1611 fullpath, directory 

1612 ) 

1613 results.append( 

1614 UserDownloadFile( 

1615 filename=relative_filename, 

1616 directory=directory, 

1617 permitted_lifespan_min=permitted_lifespan_min, 

1618 req=req, 

1619 ) 

1620 ) 

1621 return results