Coverage for cc_modules/celery.py: 40%

132 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/cc_modules/celery.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Celery app.** 

27 

28Basic steps to set up Celery: 

29 

30- Our app will be "camcops_server.cc_modules". 

31- Within that, Celery expects "celery.py", in which configuration is set up 

32 by defining the ``app`` object. 

33- Also, in ``__init__.py``, we should import that app. (No, scratch that; not 

34 necessary.) 

35- That makes ``@shared_task`` work in all other modules here. 

36- Finally, here, we ask Celery to scan ``tasks.py`` to find tasks. 

37 

38Modified: 

39 

40- The ``@shared_task`` decorator doesn't offer all the options that 

41 ``@app.task`` has. Let's skip ``@shared_task`` and the increased faff that 

42 entails. 

43 

44The difficult part seems to be getting a broker URL in the config. 

45 

46- If we load the config here, from ``celery.py``, then if the config uses any 

47 SQLAlchemy objects, it'll crash because some aren't imported. 

48- A better way is to delay configuring the app. 

49- But also, it is very tricky if the config uses SQLAlchemy objects; so it 

50 shouldn't. 

51 

52Note also re logging: 

53 

54- The log here is configured (at times, at least) by Celery, so uses its log 

55 settings. At the time of startup, that looks like plain ``print()`` 

56 statements. 

57 

58**In general, prefer delayed imports during actual tasks. Otherwise circular 

59imports are very hard to avoid.** 

60 

61If using a separate ``celery_tasks.py`` file: 

62 

63- Import this only after celery.py, or the decorators will fail. 

64 

65- If you see this error from ``camcops_server launch_workers`` when using a 

66 separate ``celery_tasks.py`` file: 

67 

68 .. code-block:: none 

69 

70 [2018-12-26 21:08:01,316: ERROR/MainProcess] Received unregistered task of type 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'. 

71 The message has been ignored and discarded. 

72 

73 Did you remember to import the module containing this task? 

74 Or maybe you're using relative imports? 

75 

76 Please see 

77 https://docs.celeryq.org/en/latest/internals/protocol.html 

78 for more information. 

79 

80 The full contents of the message body was: 

81 '[["recipient_email_rnc"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (98b) 

82 Traceback (most recent call last): 

83 File "/home/rudolf/dev/venvs/camcops/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received 

84 strategy = strategies[type_] 

85 KeyError: 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend' 

86 

87 then (1) run with ``--verbose``, which will show you the list of registered 

88 tasks; (2) note that everything here is absent; (3) insert a "crash" line at 

89 the top of this file and re-run; (4) note what's importing this file too 

90 early. 

91 

92General advice: 

93 

94- https://medium.com/@taylorhughes/three-quick-tips-from-two-years-with-celery-c05ff9d7f9eb 

95 

96Task decorator options: 

97 

98- https://docs.celeryproject.org/en/latest/reference/celery.app.task.html 

99- ``bind``: makes the first argument a ``self`` parameter to manipulate the 

100 task itself; 

101 https://docs.celeryproject.org/en/latest/userguide/tasks.html#example 

102- ``acks_late`` (for the decorator) or ``task_acks_late``: see 

103 

104 - https://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_acks_late 

105 - https://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry 

106 - Here I am retrying on failure with exponential backoff, but not using 

107 ``acks_late`` in addition. 

108 

109""" # noqa 

110 

111from contextlib import contextmanager 

112import logging 

113import os 

114from typing import Any, Dict, Generator, TYPE_CHECKING 

115 

116from cardinal_pythonlib.json.serialize import json_encode, json_decode 

117from cardinal_pythonlib.logs import BraceStyleAdapter 

118from celery import Celery, current_task 

119from kombu.serialization import register 

120 

121# TODO: Investigate 

122# "from numpy.random import uniform" leads to uniform ending up in the 

123# documentation for this file and Sphinx error: 

124# celery.py:docstring of camcops_server.cc_modules.celery.uniform:9: 

125# undefined label: random-quick-start 

126from numpy import random 

127 

128# noinspection PyUnresolvedReferences 

129import camcops_server.cc_modules.cc_all_models # import side effects (ensure all models registered) # noqa 

130 

131if TYPE_CHECKING: 

132 from celery.app.task import Task as CeleryTask 

133 from camcops_server.cc_modules.cc_export import DownloadOptions 

134 from camcops_server.cc_modules.cc_request import CamcopsRequest 

135 from camcops_server.cc_modules.cc_taskcollection import TaskCollection 

136 

137log = BraceStyleAdapter(logging.getLogger(__name__)) 

138 

139 

140# ============================================================================= 

141# Constants 

142# ============================================================================= 

143 

144CELERY_APP_NAME = "camcops_server.cc_modules" 

145# CELERY_TASKS_MODULE = "celery_tasks" 

146# ... look for "celery_tasks.py" (as opposed to the more common "tasks.py") 

147 

148CELERY_TASK_MODULE_NAME = CELERY_APP_NAME + ".celery" 

149 

150CELERY_SOFT_TIME_LIMIT_SEC = 300.0 

151MAX_RETRIES = 10 

152RETRY_MIN_DELAY_S = 5.0 

153RETRY_MAX_DELAY_S = 60.0 

154 

155 

156# ============================================================================= 

157# Configuration 

158# ============================================================================= 

159 

160register( 

161 "json", 

162 json_encode, 

163 json_decode, 

164 content_type="application/json", 

165 content_encoding="utf-8", 

166) 

167 

168 

169def get_celery_settings_dict() -> Dict[str, Any]: 

170 """ 

171 Returns a dictionary of settings to configure Celery. 

172 """ 

173 log.debug("Configuring Celery") 

174 from camcops_server.cc_modules.cc_config import ( 

175 CrontabEntry, 

176 get_default_config_from_os_env, 

177 ) # delayed import 

178 

179 config = get_default_config_from_os_env() 

180 

181 # ------------------------------------------------------------------------- 

182 # Schedule 

183 # ------------------------------------------------------------------------- 

184 schedule = {} # type: Dict[str, Any] 

185 

186 # ------------------------------------------------------------------------- 

187 # User-defined schedule entries 

188 # ------------------------------------------------------------------------- 

189 for crontab_entry in config.crontab_entries: 

190 recipient_name = crontab_entry.content 

191 schedule_name = f"export_to_{recipient_name}" 

192 log.debug( 

193 "Adding regular export job {}: crontab: {}", 

194 schedule_name, 

195 crontab_entry, 

196 ) 

197 schedule[schedule_name] = { 

198 "task": CELERY_TASK_MODULE_NAME + ".export_to_recipient_backend", 

199 "schedule": crontab_entry.get_celery_schedule(), 

200 "args": (recipient_name,), 

201 } 

202 

203 # ------------------------------------------------------------------------- 

204 # Housekeeping once per minute 

205 # ------------------------------------------------------------------------- 

206 housekeeping_crontab = CrontabEntry(minute="*", content="dummy") 

207 schedule["housekeeping"] = { 

208 "task": CELERY_TASK_MODULE_NAME + ".housekeeping", 

209 "schedule": housekeeping_crontab.get_celery_schedule(), 

210 } 

211 

212 # ------------------------------------------------------------------------- 

213 # Final Celery settings 

214 # ------------------------------------------------------------------------- 

215 return { 

216 "beat_schedule": schedule, 

217 "broker_url": config.celery_broker_url, 

218 "timezone": config.schedule_timezone, 

219 "task_annotations": { 

220 "camcops_server.cc_modules.celery.export_task_backend": { 

221 "rate_limit": config.celery_export_task_rate_limit 

222 } 

223 }, 

224 # "worker_log_color": True, # true by default for consoles anyway 

225 } 

226 

227 

228# ============================================================================= 

229# The Celery app 

230# ============================================================================= 

231 

232celery_app = Celery() 

233celery_app.add_defaults(get_celery_settings_dict()) 

234# celery_app.autodiscover_tasks([CELERY_APP_NAME], 

235# related_name=CELERY_TASKS_MODULE) 

236 

237_ = """ 

238 

239@celery_app.on_configure.connect 

240def _app_on_configure(**kwargs) -> None: 

241 log.critical("@celery_app.on_configure: {!r}", kwargs) 

242 

243 

244@celery_app.on_after_configure.connect 

245def _app_on_after_configure(**kwargs) -> None: 

246 log.critical("@celery_app.on_after_configure: {!r}", kwargs) 

247 

248""" 

249 

250 

251# ============================================================================= 

252# Test tasks 

253# ============================================================================= 

254 

255 

256@celery_app.task(bind=True) 

257def debug_task(self: "CeleryTask") -> None: 

258 """ 

259 Test as follows: 

260 

261 .. code-block:: python 

262 

263 from camcops_server.cc_modules.celery import * 

264 debug_task.delay() 

265 

266 and also launch workers with ``camcops_server launch_workers``. 

267 

268 For a bound task, the first (``self``) argument is the task instance; see 

269 https://docs.celeryproject.org/en/latest/userguide/tasks.html#bound-tasks 

270 

271 """ 

272 log.info(f"self: {self!r}") 

273 log.info(f"Backend: {current_task.backend}") 

274 

275 

276@celery_app.task 

277def debug_task_add(a: float, b: float) -> float: 

278 """ 

279 Test as follows: 

280 

281 .. code-block:: python 

282 

283 from camcops_server.cc_modules.celery import * 

284 debug_task_add.delay() 

285 """ 

286 result = a + b 

287 log.info("a = {}, b = {} => a + b = {}", a, b, result) 

288 return result 

289 

290 

291# ============================================================================= 

292# Exponential backoff 

293# ============================================================================= 

294 

295 

296def backoff_delay_s(attempts: int) -> float: 

297 """ 

298 Return a backoff delay, in seconds, given a number of attempts. 

299 

300 The delay increases very rapidly with the number of attempts: 

301 1, 2, 4, 8, 16, 32, ... 

302 

303 As per https://blog.balthazar-rouberol.com/celery-best-practices. 

304 

305 """ 

306 return 2.0**attempts 

307 

308 

309def jittered_delay_s() -> float: 

310 """ 

311 Returns a retry delay, in seconds, that is jittered. 

312 """ 

313 return random.uniform(RETRY_MIN_DELAY_S, RETRY_MAX_DELAY_S) 

314 

315 

316@contextmanager 

317def retry_backoff_if_raises(self: "CeleryTask") -> Generator[None, None, None]: 

318 """ 

319 Context manager to retry a Celery task if an exception is raised, using a 

320 "backoff" method. 

321 """ 

322 try: 

323 yield 

324 except Exception as exc: 

325 delay_s = backoff_delay_s(self.request.retries) 

326 log.error( 

327 "Task failed. Backing off. Will retry after {} s. " 

328 "Error was:\n{}", 

329 delay_s, 

330 exc, 

331 ) 

332 self.retry(countdown=delay_s, exc=exc) 

333 

334 

335@contextmanager 

336def retry_jitter_if_raises(self: "CeleryTask") -> Generator[None, None, None]: 

337 """ 

338 Context manager to retry a Celery task if an exception is raised, using a 

339 "jittered delay" method. 

340 """ 

341 try: 

342 yield 

343 except Exception as exc: 

344 delay_s = jittered_delay_s() 

345 log.error( 

346 "Task failed. Will retry after jittered delay: {} s. " 

347 "Error was:\n{}", 

348 delay_s, 

349 exc, 

350 ) 

351 self.retry(countdown=delay_s, exc=exc) 

352 

353 

354# ============================================================================= 

355# Controlling tasks 

356# ============================================================================= 

357 

358 

359def purge_jobs() -> None: 

360 """ 

361 Purge all jobs from the Celery queue. 

362 """ 

363 log.info("Purging back-end (Celery) jobs") 

364 celery_app.control.purge() 

365 log.info("... purged.") 

366 

367 

368# ============================================================================= 

369# Note re request creation and context manager 

370# ============================================================================= 

371# NOTE: 

372# - You MUST use some sort of context manager to handle requests here, because 

373# the normal Pyramid router [which ordinarily called the "finished" callbacks 

374# via request._process_finished_callbacks()] will not be plumbed in. 

375# - For debugging, use the MySQL command 

376# SELECT * FROM information_schema.innodb_locks; 

377 

378 

379# ============================================================================= 

380# Export tasks 

381# ============================================================================= 

382 

383 

384@celery_app.task( 

385 bind=True, 

386 ignore_result=True, 

387 max_retries=MAX_RETRIES, 

388 soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, 

389) 

390def export_task_backend( 

391 self: "CeleryTask", recipient_name: str, basetable: str, task_pk: int 

392) -> None: 

393 """ 

394 This function exports a single task but does so with only simple (string, 

395 integer) information, so it can be called via the Celery task queue. 

396 

397 - Calls :func:`camcops_server.cc_modules.cc_export.export_task`. 

398 

399 Args: 

400 self: the Celery task, :class:`celery.app.task.Task` 

401 recipient_name: export recipient name (as per the config file) 

402 basetable: name of the task's base table 

403 task_pk: server PK of the task 

404 """ 

405 from camcops_server.cc_modules.cc_export import ( 

406 export_task, 

407 ) # delayed import 

408 from camcops_server.cc_modules.cc_request import ( 

409 command_line_request_context, 

410 ) # delayed import 

411 from camcops_server.cc_modules.cc_taskfactory import ( 

412 task_factory_no_security_checks, 

413 ) # delayed import 

414 

415 with retry_backoff_if_raises(self): 

416 with command_line_request_context() as req: 

417 recipient = req.get_export_recipient(recipient_name) 

418 task = task_factory_no_security_checks( 

419 req.dbsession, basetable, task_pk 

420 ) 

421 if task is None: 

422 log.error( 

423 "export_task_backend for recipient {!r}: No task " 

424 "found for {} {}", 

425 recipient_name, 

426 basetable, 

427 task_pk, 

428 ) 

429 return 

430 export_task(req, recipient, task) 

431 

432 

433@celery_app.task( 

434 bind=True, 

435 ignore_result=True, 

436 max_retries=MAX_RETRIES, 

437 soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, 

438) 

439def export_to_recipient_backend( 

440 self: "CeleryTask", recipient_name: str 

441) -> None: 

442 """ 

443 From the backend, exports all pending tasks for a given recipient. 

444 

445 - Calls :func:`camcops_server.cc_modules.cc_export.export`. 

446 

447 There are two ways of doing this, when we call 

448 :func:`camcops_server.cc_modules.cc_export.export`. If we set 

449 ``schedule_via_backend=True``, this backend job fires up a whole bunch of 

450 other backend jobs, one per task to export. If we set 

451 ``schedule_via_backend=False``, our current backend job does all the work. 

452 

453 Which is best? 

454 

455 - Well, keeping it to one job is a bit simpler, perhaps. 

456 - But everything is locked independently so we can do the multi-job 

457 version, and we may as well use all the workers available. So my thought 

458 was to use ``schedule_via_backend=True``. 

459 - However, that led to database deadlocks (multiple processes trying to 

460 write a new ExportRecipient). 

461 - With some bugfixes to equality checking and a global lock (see 

462 :meth:`camcops_server.cc_modules.cc_config.CamcopsConfig.get_master_export_recipient_lockfilename`), 

463 we can try again with ``True``. 

464 - Yup, works nicely. 

465 

466 Args: 

467 self: the Celery task, :class:`celery.app.task.Task` 

468 recipient_name: export recipient name (as per the config file) 

469 """ 

470 from camcops_server.cc_modules.cc_export import export # delayed import 

471 from camcops_server.cc_modules.cc_request import ( 

472 command_line_request_context, 

473 ) # delayed import 

474 

475 with retry_backoff_if_raises(self): 

476 with command_line_request_context() as req: 

477 export( 

478 req, 

479 recipient_names=[recipient_name], 

480 schedule_via_backend=True, 

481 ) 

482 

483 

484@celery_app.task( 

485 bind=True, 

486 ignore_result=True, 

487 max_retries=MAX_RETRIES, 

488 soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, 

489) 

490def email_basic_dump( 

491 self: "CeleryTask", 

492 collection: "TaskCollection", 

493 options: "DownloadOptions", 

494) -> None: 

495 """ 

496 Send a research dump to the user via e-mail. 

497 

498 Args: 

499 self: 

500 the Celery task, :class:`celery.app.task.Task` 

501 collection: 

502 a 

503 :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

504 options: 

505 :class:`camcops_server.cc_modules.cc_export.DownloadOptions` 

506 governing the download 

507 """ 

508 from camcops_server.cc_modules.cc_export import ( 

509 make_exporter, 

510 ) # delayed import 

511 from camcops_server.cc_modules.cc_request import ( 

512 command_line_request_context, 

513 ) # delayed import 

514 

515 with retry_backoff_if_raises(self): 

516 # Create request for a specific user, so the auditing is correct. 

517 with command_line_request_context(user_id=options.user_id) as req: 

518 collection.set_request(req) 

519 exporter = make_exporter( 

520 req=req, collection=collection, options=options 

521 ) 

522 exporter.send_by_email() 

523 

524 

525@celery_app.task( 

526 bind=True, 

527 ignore_result=True, 

528 max_retries=MAX_RETRIES, 

529 soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, 

530) 

531def create_user_download( 

532 self: "CeleryTask", 

533 collection: "TaskCollection", 

534 options: "DownloadOptions", 

535) -> None: 

536 """ 

537 Create a research dump file for the user to download later. 

538 Let them know by e-mail. 

539 

540 Args: 

541 self: 

542 the Celery task, :class:`celery.app.task.Task` 

543 collection: 

544 a 

545 :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` 

546 options: 

547 :class:`camcops_server.cc_modules.cc_export.DownloadOptions` 

548 governing the download 

549 """ 

550 from camcops_server.cc_modules.cc_export import ( 

551 make_exporter, 

552 ) # delayed import 

553 from camcops_server.cc_modules.cc_request import ( 

554 command_line_request_context, 

555 ) # delayed import 

556 

557 with retry_backoff_if_raises(self): 

558 # Create request for a specific user, so the auditing is correct. 

559 with command_line_request_context(user_id=options.user_id) as req: 

560 collection.set_request(req) 

561 exporter = make_exporter( 

562 req=req, collection=collection, options=options 

563 ) 

564 exporter.create_user_download_and_email() 

565 

566 

567# ============================================================================= 

568# Housekeeping 

569# ============================================================================= 

570 

571 

572def delete_old_user_downloads(req: "CamcopsRequest") -> None: 

573 """ 

574 Deletes user download files that are past their expiry time. 

575 

576 Args: 

577 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

578 """ 

579 from camcops_server.cc_modules.cc_export import ( 

580 UserDownloadFile, 

581 ) # delayed import 

582 

583 now = req.now 

584 lifetime = req.user_download_lifetime_duration 

585 oldest_allowed = now - lifetime 

586 log.debug(f"Deleting any user download files older than {oldest_allowed}") 

587 for root, dirs, files in os.walk(req.config.user_download_dir): 

588 for f in files: 

589 udf = UserDownloadFile(filename=f, directory=root) 

590 if udf.older_than(oldest_allowed): 

591 udf.delete() 

592 

593 

594@celery_app.task( 

595 bind=False, ignore_result=True, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC 

596) 

597def housekeeping() -> None: 

598 """ 

599 Function that is run regularly to do cleanup tasks. 

600 

601 (Remember that the ``bind`` parameter to ``@celery_app.task()`` means that 

602 the first argument to the function, typically called ``self``, is the 

603 Celery task. We don't need it here. See 

604 https://docs.celeryproject.org/en/latest/userguide/tasks.html#bound-tasks.) 

605 """ 

606 from camcops_server.cc_modules.cc_request import ( 

607 command_line_request_context, 

608 ) # delayed import 

609 from camcops_server.cc_modules.cc_session import ( 

610 CamcopsSession, 

611 ) # delayed import 

612 from camcops_server.cc_modules.cc_user import ( 

613 SecurityAccountLockout, 

614 SecurityLoginFailure, 

615 ) # delayed import 

616 

617 log.debug("Housekeeping!") 

618 with command_line_request_context() as req: 

619 # --------------------------------------------------------------------- 

620 # Housekeeping tasks 

621 # --------------------------------------------------------------------- 

622 # We had a problem with MySQL locking here (two locks open for what 

623 # appeared to be a single delete, followed by a lock timeout). Seems to 

624 # be working now. 

625 CamcopsSession.delete_old_sessions(req) 

626 SecurityAccountLockout.delete_old_account_lockouts(req) 

627 SecurityLoginFailure.clear_dummy_login_failures_if_necessary(req) 

628 delete_old_user_downloads(req)