Coverage for tasks/kirby_mcq.py: 35%
172 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/tasks/kirby.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26"""
28import logging
29import math
30from typing import Dict, List, Optional, Type
32import numpy as np
33from numpy.linalg.linalg import LinAlgError
34from scipy.stats.mstats import gmean
35from sqlalchemy.orm import Mapped, mapped_column
36from sqlalchemy.sql.sqltypes import Float
37import statsmodels.api as sm
39# noinspection PyProtectedMember
40from statsmodels.discrete.discrete_model import BinaryResultsWrapper
41from statsmodels.tools.sm_exceptions import PerfectSeparationError
43from camcops_server.cc_modules.cc_constants import CssClass
44from camcops_server.cc_modules.cc_db import (
45 ancillary_relationship,
46 GenericTabletRecordMixin,
47 TaskDescendant,
48)
49from camcops_server.cc_modules.cc_html import answer, tr_qa
50from camcops_server.cc_modules.cc_request import CamcopsRequest
51from camcops_server.cc_modules.cc_sqlalchemy import Base
52from camcops_server.cc_modules.cc_sqla_coltypes import (
53 CurrencyColType,
54 mapped_bool_column,
55)
56from camcops_server.cc_modules.cc_summaryelement import SummaryElement
57from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin
59log = logging.getLogger(__name__)
62# =============================================================================
63# KirbyRewardPair
64# =============================================================================
67class KirbyRewardPair(object):
68 """
69 Represents a pair of rewards: a small immediate reward (SIR) and a large
70 delayed reward (LDR).
71 """
73 def __init__(
74 self,
75 sir: int,
76 ldr: int,
77 delay_days: int,
78 chose_ldr: bool = None,
79 currency: str = "£",
80 currency_symbol_first: bool = True,
81 ) -> None:
82 """
83 Args:
84 sir: amount of the small immediate reward (SIR)
85 ldr: amount of the large delayed reward (LDR)
86 delay_days: delay to the LDR, in days
87 chose_ldr: if result also represented, did the subject choose the
88 LDR?
89 currency: currency symbol
90 currency_symbol_first: symbol before amount?
91 """
92 self.sir = sir
93 self.ldr = ldr
94 self.delay_days = delay_days
95 self.chose_ldr = chose_ldr
96 self.currency = currency
97 self.currency_symbol_first = currency_symbol_first
99 def money(self, amount: int) -> str:
100 """
101 Returns a currency amount, formatted.
102 """
103 if self.currency_symbol_first:
104 return f"{self.currency}{amount}"
105 return f"{amount}{self.currency}"
107 def sir_string(self, req: CamcopsRequest) -> str:
108 """
109 Returns a string representing the small immediate reward, e.g.
110 "£10 today".
111 """
112 _ = req.gettext
113 return _("{money} today").format(money=self.money(self.sir))
115 def ldr_string(self, req: CamcopsRequest) -> str:
116 """
117 Returns a string representing the large delayed reward, e.g.
118 "£50 in 200 days".
119 """
120 _ = req.gettext
121 return _("{money} in {days} days").format(
122 money=self.money(self.ldr), days=self.delay_days
123 )
125 def question(self, req: CamcopsRequest) -> str:
126 """
127 The question posed for this reward pair.
128 """
129 _ = req.gettext
130 return _("Would you prefer {sir}, or {ldr}?").format(
131 sir=self.sir_string(req), ldr=self.ldr_string(req)
132 )
134 def answer(self, req: CamcopsRequest) -> str:
135 """
136 Returns the subject's answer, or "?".
137 """
138 if self.chose_ldr is None:
139 return "?"
140 return self.ldr_string(req) if self.chose_ldr else self.sir_string(req)
142 def k_indifference(self) -> float:
143 """
144 Returns the value of k, the discounting parameter (units: days ^ -1)
145 if the subject is indifferent between the two choices.
147 For calculations see :ref:`kirby_mcq.rst <kirby_mcq>`.
148 """
149 a1 = self.sir
150 a2 = self.ldr
151 d2 = self.delay_days
152 return (a2 - a1) / (a1 * d2)
154 def choice_consistent(self, k: float) -> bool:
155 """
156 Was the choice consistent with the k value given?
158 - If no choice has been recorded, returns false.
160 - If the k value equals the implied indifference point exactly (meaning
161 that the subject should not care), return true.
162 """
163 if self.chose_ldr is None:
164 return False
165 k_indiff = self.k_indifference()
166 if math.isclose(k, k_indiff):
167 # Subject is indifferent
168 return True
169 # WARNING: "self.chose_ldr == k < k_indiff" FAILS.
170 # Python will evaluate this to "(self.chose_ldr == k) < k_indiff", and
171 # despite that evaluating to "a bool < an int", that's legal; e.g.
172 # "False < 4" evaluates to True.
173 # Must be bracketed like this:
174 return self.chose_ldr == (k < k_indiff)
177# =============================================================================
178# KirbyTrial
179# =============================================================================
182class KirbyTrial(GenericTabletRecordMixin, TaskDescendant, Base):
183 __tablename__ = "kirby_mcq_trials"
185 kirby_mcq_id: Mapped[int] = mapped_column(comment="FK to kirby_mcq")
186 trial: Mapped[int] = mapped_column(comment="Trial number (1-based)")
187 sir: Mapped[Optional[int]] = mapped_column(
188 comment="Small immediate reward"
189 )
190 ldr: Mapped[Optional[int]] = mapped_column(comment="Large delayed reward")
191 delay_days: Mapped[Optional[int]] = mapped_column(comment="Delay in days")
192 currency: Mapped[Optional[str]] = mapped_column(
193 CurrencyColType, comment="Currency symbol"
194 )
195 currency_symbol_first: Mapped[Optional[bool]] = mapped_bool_column(
196 "currency_symbol_first",
197 comment="Does the currency symbol come before the amount?",
198 )
199 chose_ldr: Mapped[Optional[bool]] = mapped_bool_column(
200 "chose_ldr", comment="Did the subject choose the large delayed reward?"
201 )
203 def info(self) -> KirbyRewardPair:
204 """
205 Returns the trial information as a :class:`KirbyRewardPair`.
206 """
207 return KirbyRewardPair(
208 sir=self.sir,
209 ldr=self.ldr,
210 delay_days=self.delay_days,
211 chose_ldr=self.chose_ldr,
212 currency=self.currency,
213 currency_symbol_first=self.currency_symbol_first,
214 )
216 def answered(self) -> bool:
217 """
218 Has the subject answered this question?
219 """
220 return self.chose_ldr is not None
222 # -------------------------------------------------------------------------
223 # TaskDescendant overrides
224 # -------------------------------------------------------------------------
226 @classmethod
227 def task_ancestor_class(cls) -> Optional[Type["Task"]]:
228 return Kirby
230 def task_ancestor(self) -> Optional["Kirby"]:
231 return Kirby.get_linked(
232 self.kirby_mcq_id, self
233 ) # type: ignore[return-value]
236# =============================================================================
237# Kirby
238# =============================================================================
241class Kirby(TaskHasPatientMixin, Task): # type: ignore[misc]
242 """
243 Server implementation of the Kirby Monetary Choice Questionnaire task.
244 """
246 __tablename__ = "kirby_mcq"
247 shortname = "KirbyMCQ"
249 EXPECTED_N_TRIALS = 27
251 # No fields beyond the basics.
253 # Relationships
254 trials = ancillary_relationship( # type: ignore[assignment]
255 parent_class_name="Kirby",
256 ancillary_class_name="KirbyTrial",
257 ancillary_fk_to_parent_attr_name="kirby_mcq_id",
258 ancillary_order_by_attr_name="trial",
259 ) # type: List[KirbyTrial]
261 @staticmethod
262 def longname(req: "CamcopsRequest") -> str:
263 _ = req.gettext
264 return _("Kirby et al. 1999 Monetary Choice Questionnaire")
266 def is_complete(self) -> bool:
267 if len(self.trials) != self.EXPECTED_N_TRIALS:
268 return False
269 for t in self.trials:
270 if not t.answered():
271 return False
272 return True
274 def all_choice_results(self) -> List[KirbyRewardPair]:
275 """
276 Returns a list of :class:`KirbyRewardPair` objects, one for each
277 answered question.
278 """
279 results = [] # type: List[KirbyRewardPair]
280 for t in self.trials:
281 if t.answered():
282 results.append(t.info())
283 return results
285 @staticmethod
286 def n_choices_consistent(k: float, results: List[KirbyRewardPair]) -> int:
287 """
288 Returns the number of choices that are consistent with the given k
289 value.
290 """
291 n_consistent = 0
292 for pair in results:
293 if pair.choice_consistent(k):
294 n_consistent += 1
295 return n_consistent
297 def k_kirby(self, results: List[KirbyRewardPair]) -> Optional[float]:
298 """
299 Returns k for a subject as determined using Kirby's (2000) method.
300 See :ref:`kirby_mcq.rst <kirby_mcq>`.
301 """
302 # 1. For every k value assessed by the questions, establish the degree
303 # of consistency.
304 consistency = {} # type: Dict[float, int]
305 for pair in results:
306 k = pair.k_indifference()
307 if k not in consistency:
308 consistency[k] = self.n_choices_consistent(k, results)
309 if not consistency:
310 return None
312 # 2. Restrict to the results that are equally and maximally consistent.
313 max_consistency = max(consistency.values())
314 good_k_values = [
315 k for k, v in consistency.items() if v == max_consistency
316 ]
318 # 3. Take the geometric mean of those good k values.
319 # noinspection PyTypeChecker
320 subject_k = gmean(good_k_values) # type: np.float64
322 return float(subject_k)
324 @staticmethod
325 def k_wileyto(results: List[KirbyRewardPair]) -> Optional[float]:
326 """
327 Returns k for a subject as determined using Wileyto et al.'s (2004)
328 method. See :ref:`kirby_mcq.rst <kirby_mcq>`.
329 """
330 if not results:
331 return None
332 n_predictors = 2
333 n_observations = len(results)
334 x = np.zeros((n_observations, n_predictors))
335 y = np.zeros(n_observations)
336 for i in range(n_observations):
337 pair = results[i]
338 a1 = pair.sir
339 a2 = pair.ldr
340 d2 = pair.delay_days
341 predictor1 = 1 - (a2 / a1)
342 predictor2 = d2
343 x[i, 0] = predictor1
344 x[i, 1] = predictor2
345 y[i] = int(pair.chose_ldr) # bool to int
346 lr = sm.Logit(y, x)
347 try:
348 result = lr.fit() # type: BinaryResultsWrapper
349 except (
350 LinAlgError, # e.g. "singular matrix"
351 PerfectSeparationError,
352 ) as e:
353 log.debug(f"sm.Logit error: {e}")
354 return None
355 coeffs = result.params
356 beta1 = coeffs[0]
357 beta2 = coeffs[1]
358 try:
359 k = beta2 / beta1
360 except ZeroDivisionError:
361 log.warning("Division by zero when calculating k = beta2 / beta1")
362 return None
363 return k
365 # noinspection PyUnusedLocal
366 def get_summaries(self, req: CamcopsRequest) -> List[SummaryElement]:
367 results = self.all_choice_results()
368 return self.standard_task_summary_fields() + [
369 SummaryElement(
370 name="k_kirby",
371 coltype=Float(),
372 value=self.k_kirby(results),
373 comment="k (days^-1, Kirby 2000 method)",
374 ),
375 SummaryElement(
376 name="k_wileyto",
377 coltype=Float(),
378 value=self.k_wileyto(results),
379 comment="k (days^-1, Wileyto 2004 method)",
380 ),
381 ]
383 def get_task_html(self, req: CamcopsRequest) -> str:
384 dp = 6
385 qlines = [] # type: List[str]
386 for t in self.trials:
387 info = t.info()
388 qlines.append(
389 tr_qa(
390 f"{t.trial}. {info.question(req)} "
391 f"<i>(k<sub>indiff</sub> = "
392 f"{round(info.k_indifference(), dp)})</i>",
393 info.answer(req),
394 )
395 )
396 q_a = "\n".join(qlines)
397 results = self.all_choice_results()
398 k_kirby = self.k_kirby(results)
399 if k_kirby is None:
400 inv_k_kirby = None
401 else:
402 inv_k_kirby = int(round(1 / k_kirby)) # round to int
403 # ... you'd think the int() was unnecessary but it is needed
404 k_kirby = round(k_kirby, dp)
405 k_wileyto = self.k_wileyto(results)
406 if k_wileyto is None:
407 inv_k_wileyto = None
408 else:
409 inv_k_wileyto = int(round(1 / k_wileyto)) # round to int
410 k_wileyto = round(k_wileyto, dp)
411 return f"""
412 <div class="{CssClass.SUMMARY}">
413 <table class="{CssClass.SUMMARY}">
414 {self.get_is_complete_tr(req)}
415 <tr>
416 <td><i>k</i> (days<sup>–1</sup>, Kirby 2000 method)</td>
417 <td>{answer(k_kirby)}
418 </tr>
419 <tr>
420 <td>1/<i>k</i> (days, Kirby method): time to half value</td>
421 <td>{answer(inv_k_kirby)}
422 </tr>
423 <tr>
424 <td><i>k</i> (days<sup>–1</sup>, Wileyto et al. 2004 method)</td>
425 <td>{answer(k_wileyto)}
426 </tr>
427 <tr>
428 <td>1/<i>k</i> (days, Wileyto method): time to half value</td>
429 <td>{answer(inv_k_wileyto)}
430 </tr>
431 </table>
432 </div>
433 <table class="{CssClass.TASKDETAIL}">
434 <tr>
435 <th width="75%">Question</th>
436 <th width="25%">Answer</th>
437 </tr>
438 {q_a}
439 </table>
440 """ # noqa