Coverage for cc_modules/celery.py: 40%
132 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/cc_modules/celery.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Celery app.**
28Basic steps to set up Celery:
30- Our app will be "camcops_server.cc_modules".
31- Within that, Celery expects "celery.py", in which configuration is set up
32 by defining the ``app`` object.
33- Also, in ``__init__.py``, we should import that app. (No, scratch that; not
34 necessary.)
35- That makes ``@shared_task`` work in all other modules here.
36- Finally, here, we ask Celery to scan ``tasks.py`` to find tasks.
38Modified:
40- The ``@shared_task`` decorator doesn't offer all the options that
41 ``@app.task`` has. Let's skip ``@shared_task`` and the increased faff that
42 entails.
44The difficult part seems to be getting a broker URL in the config.
46- If we load the config here, from ``celery.py``, then if the config uses any
47 SQLAlchemy objects, it'll crash because some aren't imported.
48- A better way is to delay configuring the app.
49- But also, it is very tricky if the config uses SQLAlchemy objects; so it
50 shouldn't.
52Note also re logging:
54- The log here is configured (at times, at least) by Celery, so uses its log
55 settings. At the time of startup, that looks like plain ``print()``
56 statements.
58**In general, prefer delayed imports during actual tasks. Otherwise circular
59imports are very hard to avoid.**
61If using a separate ``celery_tasks.py`` file:
63- Import this only after celery.py, or the decorators will fail.
65- If you see this error from ``camcops_server launch_workers`` when using a
66 separate ``celery_tasks.py`` file:
68 .. code-block:: none
70 [2018-12-26 21:08:01,316: ERROR/MainProcess] Received unregistered task of type 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'.
71 The message has been ignored and discarded.
73 Did you remember to import the module containing this task?
74 Or maybe you're using relative imports?
76 Please see
77 https://docs.celeryq.org/en/latest/internals/protocol.html
78 for more information.
80 The full contents of the message body was:
81 '[["recipient_email_rnc"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (98b)
82 Traceback (most recent call last):
83 File "/home/rudolf/dev/venvs/camcops/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received
84 strategy = strategies[type_]
85 KeyError: 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'
87 then (1) run with ``--verbose``, which will show you the list of registered
88 tasks; (2) note that everything here is absent; (3) insert a "crash" line at
89 the top of this file and re-run; (4) note what's importing this file too
90 early.
92General advice:
94- https://medium.com/@taylorhughes/three-quick-tips-from-two-years-with-celery-c05ff9d7f9eb
96Task decorator options:
98- https://docs.celeryproject.org/en/latest/reference/celery.app.task.html
99- ``bind``: makes the first argument a ``self`` parameter to manipulate the
100 task itself;
101 https://docs.celeryproject.org/en/latest/userguide/tasks.html#example
102- ``acks_late`` (for the decorator) or ``task_acks_late``: see
104 - https://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_acks_late
105 - https://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry
106 - Here I am retrying on failure with exponential backoff, but not using
107 ``acks_late`` in addition.
109""" # noqa
111from contextlib import contextmanager
112import logging
113import os
114from typing import Any, Dict, Generator, TYPE_CHECKING
116from cardinal_pythonlib.json.serialize import json_encode, json_decode
117from cardinal_pythonlib.logs import BraceStyleAdapter
118from celery import Celery, current_task
119from kombu.serialization import register
121# TODO: Investigate
122# "from numpy.random import uniform" leads to uniform ending up in the
123# documentation for this file and Sphinx error:
124# celery.py:docstring of camcops_server.cc_modules.celery.uniform:9:
125# undefined label: random-quick-start
126from numpy import random
128# noinspection PyUnresolvedReferences
129import camcops_server.cc_modules.cc_all_models # import side effects (ensure all models registered) # noqa
131if TYPE_CHECKING:
132 from celery.app.task import Task as CeleryTask
133 from camcops_server.cc_modules.cc_export import DownloadOptions
134 from camcops_server.cc_modules.cc_request import CamcopsRequest
135 from camcops_server.cc_modules.cc_taskcollection import TaskCollection
137log = BraceStyleAdapter(logging.getLogger(__name__))
140# =============================================================================
141# Constants
142# =============================================================================
144CELERY_APP_NAME = "camcops_server.cc_modules"
145# CELERY_TASKS_MODULE = "celery_tasks"
146# ... look for "celery_tasks.py" (as opposed to the more common "tasks.py")
148CELERY_TASK_MODULE_NAME = CELERY_APP_NAME + ".celery"
150CELERY_SOFT_TIME_LIMIT_SEC = 300.0
151MAX_RETRIES = 10
152RETRY_MIN_DELAY_S = 5.0
153RETRY_MAX_DELAY_S = 60.0
156# =============================================================================
157# Configuration
158# =============================================================================
160register(
161 "json",
162 json_encode,
163 json_decode,
164 content_type="application/json",
165 content_encoding="utf-8",
166)
169def get_celery_settings_dict() -> Dict[str, Any]:
170 """
171 Returns a dictionary of settings to configure Celery.
172 """
173 log.debug("Configuring Celery")
174 from camcops_server.cc_modules.cc_config import (
175 CrontabEntry,
176 get_default_config_from_os_env,
177 ) # delayed import
179 config = get_default_config_from_os_env()
181 # -------------------------------------------------------------------------
182 # Schedule
183 # -------------------------------------------------------------------------
184 schedule = {} # type: Dict[str, Any]
186 # -------------------------------------------------------------------------
187 # User-defined schedule entries
188 # -------------------------------------------------------------------------
189 for crontab_entry in config.crontab_entries:
190 recipient_name = crontab_entry.content
191 schedule_name = f"export_to_{recipient_name}"
192 log.debug(
193 "Adding regular export job {}: crontab: {}",
194 schedule_name,
195 crontab_entry,
196 )
197 schedule[schedule_name] = {
198 "task": CELERY_TASK_MODULE_NAME + ".export_to_recipient_backend",
199 "schedule": crontab_entry.get_celery_schedule(),
200 "args": (recipient_name,),
201 }
203 # -------------------------------------------------------------------------
204 # Housekeeping once per minute
205 # -------------------------------------------------------------------------
206 housekeeping_crontab = CrontabEntry(minute="*", content="dummy")
207 schedule["housekeeping"] = {
208 "task": CELERY_TASK_MODULE_NAME + ".housekeeping",
209 "schedule": housekeeping_crontab.get_celery_schedule(),
210 }
212 # -------------------------------------------------------------------------
213 # Final Celery settings
214 # -------------------------------------------------------------------------
215 return {
216 "beat_schedule": schedule,
217 "broker_url": config.celery_broker_url,
218 "timezone": config.schedule_timezone,
219 "task_annotations": {
220 "camcops_server.cc_modules.celery.export_task_backend": {
221 "rate_limit": config.celery_export_task_rate_limit
222 }
223 },
224 # "worker_log_color": True, # true by default for consoles anyway
225 }
228# =============================================================================
229# The Celery app
230# =============================================================================
232celery_app = Celery()
233celery_app.add_defaults(get_celery_settings_dict())
234# celery_app.autodiscover_tasks([CELERY_APP_NAME],
235# related_name=CELERY_TASKS_MODULE)
237_ = """
239@celery_app.on_configure.connect
240def _app_on_configure(**kwargs) -> None:
241 log.critical("@celery_app.on_configure: {!r}", kwargs)
244@celery_app.on_after_configure.connect
245def _app_on_after_configure(**kwargs) -> None:
246 log.critical("@celery_app.on_after_configure: {!r}", kwargs)
248"""
251# =============================================================================
252# Test tasks
253# =============================================================================
256@celery_app.task(bind=True)
257def debug_task(self: "CeleryTask") -> None:
258 """
259 Test as follows:
261 .. code-block:: python
263 from camcops_server.cc_modules.celery import *
264 debug_task.delay()
266 and also launch workers with ``camcops_server launch_workers``.
268 For a bound task, the first (``self``) argument is the task instance; see
269 https://docs.celeryproject.org/en/latest/userguide/tasks.html#bound-tasks
271 """
272 log.info(f"self: {self!r}")
273 log.info(f"Backend: {current_task.backend}")
276@celery_app.task
277def debug_task_add(a: float, b: float) -> float:
278 """
279 Test as follows:
281 .. code-block:: python
283 from camcops_server.cc_modules.celery import *
284 debug_task_add.delay()
285 """
286 result = a + b
287 log.info("a = {}, b = {} => a + b = {}", a, b, result)
288 return result
291# =============================================================================
292# Exponential backoff
293# =============================================================================
296def backoff_delay_s(attempts: int) -> float:
297 """
298 Return a backoff delay, in seconds, given a number of attempts.
300 The delay increases very rapidly with the number of attempts:
301 1, 2, 4, 8, 16, 32, ...
303 As per https://blog.balthazar-rouberol.com/celery-best-practices.
305 """
306 return 2.0**attempts
309def jittered_delay_s() -> float:
310 """
311 Returns a retry delay, in seconds, that is jittered.
312 """
313 return random.uniform(RETRY_MIN_DELAY_S, RETRY_MAX_DELAY_S)
316@contextmanager
317def retry_backoff_if_raises(self: "CeleryTask") -> Generator[None, None, None]:
318 """
319 Context manager to retry a Celery task if an exception is raised, using a
320 "backoff" method.
321 """
322 try:
323 yield
324 except Exception as exc:
325 delay_s = backoff_delay_s(self.request.retries)
326 log.error(
327 "Task failed. Backing off. Will retry after {} s. "
328 "Error was:\n{}",
329 delay_s,
330 exc,
331 )
332 self.retry(countdown=delay_s, exc=exc)
335@contextmanager
336def retry_jitter_if_raises(self: "CeleryTask") -> Generator[None, None, None]:
337 """
338 Context manager to retry a Celery task if an exception is raised, using a
339 "jittered delay" method.
340 """
341 try:
342 yield
343 except Exception as exc:
344 delay_s = jittered_delay_s()
345 log.error(
346 "Task failed. Will retry after jittered delay: {} s. "
347 "Error was:\n{}",
348 delay_s,
349 exc,
350 )
351 self.retry(countdown=delay_s, exc=exc)
354# =============================================================================
355# Controlling tasks
356# =============================================================================
359def purge_jobs() -> None:
360 """
361 Purge all jobs from the Celery queue.
362 """
363 log.info("Purging back-end (Celery) jobs")
364 celery_app.control.purge()
365 log.info("... purged.")
368# =============================================================================
369# Note re request creation and context manager
370# =============================================================================
371# NOTE:
372# - You MUST use some sort of context manager to handle requests here, because
373# the normal Pyramid router [which ordinarily called the "finished" callbacks
374# via request._process_finished_callbacks()] will not be plumbed in.
375# - For debugging, use the MySQL command
376# SELECT * FROM information_schema.innodb_locks;
379# =============================================================================
380# Export tasks
381# =============================================================================
384@celery_app.task(
385 bind=True,
386 ignore_result=True,
387 max_retries=MAX_RETRIES,
388 soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC,
389)
390def export_task_backend(
391 self: "CeleryTask", recipient_name: str, basetable: str, task_pk: int
392) -> None:
393 """
394 This function exports a single task but does so with only simple (string,
395 integer) information, so it can be called via the Celery task queue.
397 - Calls :func:`camcops_server.cc_modules.cc_export.export_task`.
399 Args:
400 self: the Celery task, :class:`celery.app.task.Task`
401 recipient_name: export recipient name (as per the config file)
402 basetable: name of the task's base table
403 task_pk: server PK of the task
404 """
405 from camcops_server.cc_modules.cc_export import (
406 export_task,
407 ) # delayed import
408 from camcops_server.cc_modules.cc_request import (
409 command_line_request_context,
410 ) # delayed import
411 from camcops_server.cc_modules.cc_taskfactory import (
412 task_factory_no_security_checks,
413 ) # delayed import
415 with retry_backoff_if_raises(self):
416 with command_line_request_context() as req:
417 recipient = req.get_export_recipient(recipient_name)
418 task = task_factory_no_security_checks(
419 req.dbsession, basetable, task_pk
420 )
421 if task is None:
422 log.error(
423 "export_task_backend for recipient {!r}: No task "
424 "found for {} {}",
425 recipient_name,
426 basetable,
427 task_pk,
428 )
429 return
430 export_task(req, recipient, task)
433@celery_app.task(
434 bind=True,
435 ignore_result=True,
436 max_retries=MAX_RETRIES,
437 soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC,
438)
439def export_to_recipient_backend(
440 self: "CeleryTask", recipient_name: str
441) -> None:
442 """
443 From the backend, exports all pending tasks for a given recipient.
445 - Calls :func:`camcops_server.cc_modules.cc_export.export`.
447 There are two ways of doing this, when we call
448 :func:`camcops_server.cc_modules.cc_export.export`. If we set
449 ``schedule_via_backend=True``, this backend job fires up a whole bunch of
450 other backend jobs, one per task to export. If we set
451 ``schedule_via_backend=False``, our current backend job does all the work.
453 Which is best?
455 - Well, keeping it to one job is a bit simpler, perhaps.
456 - But everything is locked independently so we can do the multi-job
457 version, and we may as well use all the workers available. So my thought
458 was to use ``schedule_via_backend=True``.
459 - However, that led to database deadlocks (multiple processes trying to
460 write a new ExportRecipient).
461 - With some bugfixes to equality checking and a global lock (see
462 :meth:`camcops_server.cc_modules.cc_config.CamcopsConfig.get_master_export_recipient_lockfilename`),
463 we can try again with ``True``.
464 - Yup, works nicely.
466 Args:
467 self: the Celery task, :class:`celery.app.task.Task`
468 recipient_name: export recipient name (as per the config file)
469 """
470 from camcops_server.cc_modules.cc_export import export # delayed import
471 from camcops_server.cc_modules.cc_request import (
472 command_line_request_context,
473 ) # delayed import
475 with retry_backoff_if_raises(self):
476 with command_line_request_context() as req:
477 export(
478 req,
479 recipient_names=[recipient_name],
480 schedule_via_backend=True,
481 )
484@celery_app.task(
485 bind=True,
486 ignore_result=True,
487 max_retries=MAX_RETRIES,
488 soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC,
489)
490def email_basic_dump(
491 self: "CeleryTask",
492 collection: "TaskCollection",
493 options: "DownloadOptions",
494) -> None:
495 """
496 Send a research dump to the user via e-mail.
498 Args:
499 self:
500 the Celery task, :class:`celery.app.task.Task`
501 collection:
502 a
503 :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection`
504 options:
505 :class:`camcops_server.cc_modules.cc_export.DownloadOptions`
506 governing the download
507 """
508 from camcops_server.cc_modules.cc_export import (
509 make_exporter,
510 ) # delayed import
511 from camcops_server.cc_modules.cc_request import (
512 command_line_request_context,
513 ) # delayed import
515 with retry_backoff_if_raises(self):
516 # Create request for a specific user, so the auditing is correct.
517 with command_line_request_context(user_id=options.user_id) as req:
518 collection.set_request(req)
519 exporter = make_exporter(
520 req=req, collection=collection, options=options
521 )
522 exporter.send_by_email()
525@celery_app.task(
526 bind=True,
527 ignore_result=True,
528 max_retries=MAX_RETRIES,
529 soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC,
530)
531def create_user_download(
532 self: "CeleryTask",
533 collection: "TaskCollection",
534 options: "DownloadOptions",
535) -> None:
536 """
537 Create a research dump file for the user to download later.
538 Let them know by e-mail.
540 Args:
541 self:
542 the Celery task, :class:`celery.app.task.Task`
543 collection:
544 a
545 :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection`
546 options:
547 :class:`camcops_server.cc_modules.cc_export.DownloadOptions`
548 governing the download
549 """
550 from camcops_server.cc_modules.cc_export import (
551 make_exporter,
552 ) # delayed import
553 from camcops_server.cc_modules.cc_request import (
554 command_line_request_context,
555 ) # delayed import
557 with retry_backoff_if_raises(self):
558 # Create request for a specific user, so the auditing is correct.
559 with command_line_request_context(user_id=options.user_id) as req:
560 collection.set_request(req)
561 exporter = make_exporter(
562 req=req, collection=collection, options=options
563 )
564 exporter.create_user_download_and_email()
567# =============================================================================
568# Housekeeping
569# =============================================================================
572def delete_old_user_downloads(req: "CamcopsRequest") -> None:
573 """
574 Deletes user download files that are past their expiry time.
576 Args:
577 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
578 """
579 from camcops_server.cc_modules.cc_export import (
580 UserDownloadFile,
581 ) # delayed import
583 now = req.now
584 lifetime = req.user_download_lifetime_duration
585 oldest_allowed = now - lifetime
586 log.debug(f"Deleting any user download files older than {oldest_allowed}")
587 for root, dirs, files in os.walk(req.config.user_download_dir):
588 for f in files:
589 udf = UserDownloadFile(filename=f, directory=root)
590 if udf.older_than(oldest_allowed):
591 udf.delete()
594@celery_app.task(
595 bind=False, ignore_result=True, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC
596)
597def housekeeping() -> None:
598 """
599 Function that is run regularly to do cleanup tasks.
601 (Remember that the ``bind`` parameter to ``@celery_app.task()`` means that
602 the first argument to the function, typically called ``self``, is the
603 Celery task. We don't need it here. See
604 https://docs.celeryproject.org/en/latest/userguide/tasks.html#bound-tasks.)
605 """
606 from camcops_server.cc_modules.cc_request import (
607 command_line_request_context,
608 ) # delayed import
609 from camcops_server.cc_modules.cc_session import (
610 CamcopsSession,
611 ) # delayed import
612 from camcops_server.cc_modules.cc_user import (
613 SecurityAccountLockout,
614 SecurityLoginFailure,
615 ) # delayed import
617 log.debug("Housekeeping!")
618 with command_line_request_context() as req:
619 # ---------------------------------------------------------------------
620 # Housekeeping tasks
621 # ---------------------------------------------------------------------
622 # We had a problem with MySQL locking here (two locks open for what
623 # appeared to be a single delete, followed by a lock timeout). Seems to
624 # be working now.
625 CamcopsSession.delete_old_sessions(req)
626 SecurityAccountLockout.delete_old_account_lockouts(req)
627 SecurityLoginFailure.clear_dummy_login_failures_if_necessary(req)
628 delete_old_user_downloads(req)