Coverage for tasks/kirby_mcq.py : 35%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/tasks/kirby.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27"""
29import logging
30import math
31from typing import Dict, List, Optional, Type
33import numpy as np
34from numpy.linalg.linalg import LinAlgError
35from scipy.stats.mstats import gmean
36from sqlalchemy.sql.schema import Column
37from sqlalchemy.sql.sqltypes import Float, Integer
38import statsmodels.api as sm
39# noinspection PyProtectedMember
40from statsmodels.discrete.discrete_model import BinaryResultsWrapper
41from statsmodels.tools.sm_exceptions import PerfectSeparationError
43from camcops_server.cc_modules.cc_constants import CssClass
44from camcops_server.cc_modules.cc_db import (
45 ancillary_relationship,
46 GenericTabletRecordMixin,
47 TaskDescendant,
48)
49from camcops_server.cc_modules.cc_html import (
50 answer,
51 tr_qa,
52)
53from camcops_server.cc_modules.cc_request import CamcopsRequest
54from camcops_server.cc_modules.cc_sqlalchemy import Base
55from camcops_server.cc_modules.cc_sqla_coltypes import (
56 BoolColumn,
57 CurrencyColType,
58)
59from camcops_server.cc_modules.cc_summaryelement import SummaryElement
60from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin
62log = logging.getLogger(__name__)
65# =============================================================================
66# KirbyRewardPair
67# =============================================================================
69class KirbyRewardPair(object):
70 """
71 Represents a pair of rewards: a small immediate reward (SIR) and a large
72 delayed reward (LDR).
73 """
74 def __init__(self,
75 sir: int,
76 ldr: int,
77 delay_days: int,
78 chose_ldr: bool = None,
79 currency: str = "£",
80 currency_symbol_first: bool = True) -> None:
81 """
82 Args:
83 sir: amount of the small immediate reward (SIR)
84 ldr: amount of the large delayed reward (LDR)
85 delay_days: delay to the LDR, in days
86 chose_ldr: if result also represented, did the subject choose the
87 LDR?
88 currency: currency symbol
89 currency_symbol_first: symbol before amount?
90 """
91 self.sir = sir
92 self.ldr = ldr
93 self.delay_days = delay_days
94 self.chose_ldr = chose_ldr
95 self.currency = currency
96 self.currency_symbol_first = currency_symbol_first
98 def money(self, amount: int) -> str:
99 """
100 Returns a currency amount, formatted.
101 """
102 if self.currency_symbol_first:
103 return f"{self.currency}{amount}"
104 return f"{amount}{self.currency}"
106 def sir_string(self, req: CamcopsRequest) -> str:
107 """
108 Returns a string representing the small immediate reward, e.g.
109 "£10 today".
110 """
111 _ = req.gettext
112 return _("{money} today").format(money=self.money(self.sir))
114 def ldr_string(self, req: CamcopsRequest) -> str:
115 """
116 Returns a string representing the large delayed reward, e.g.
117 "£50 in 200 days".
118 """
119 _ = req.gettext
120 return _("{money} in {days} days").format(
121 money=self.money(self.ldr),
122 days=self.delay_days,
123 )
125 def question(self, req: CamcopsRequest) -> str:
126 """
127 The question posed for this reward pair.
128 """
129 _ = req.gettext
130 return _("Would you prefer {sir}, or {ldr}?").format(
131 sir=self.sir_string(req),
132 ldr=self.ldr_string(req),
133 )
135 def answer(self, req: CamcopsRequest) -> str:
136 """
137 Returns the subject's answer, or "?".
138 """
139 if self.chose_ldr is None:
140 return "?"
141 return self.ldr_string(req) if self.chose_ldr else self.sir_string(req)
143 def k_indifference(self) -> float:
144 """
145 Returns the value of k, the discounting parameter (units: days ^ -1)
146 if the subject is indifferent between the two choices.
148 For calculations see :ref:`kirby_mcq.rst <kirby_mcq>`.
149 """
150 a1 = self.sir
151 a2 = self.ldr
152 d2 = self.delay_days
153 return (a2 - a1) / (a1 * d2)
155 def choice_consistent(self, k: float) -> bool:
156 """
157 Was the choice consistent with the k value given?
159 - If no choice has been recorded, returns false.
161 - If the k value equals the implied indifference point exactly (meaning
162 that the subject should not care), return true.
163 """
164 if self.chose_ldr is None:
165 return False
166 k_indiff = self.k_indifference()
167 if math.isclose(k, k_indiff):
168 # Subject is indifferent
169 return True
170 # WARNING: "self.chose_ldr == k < k_indiff" FAILS.
171 # Python will evaluate this to "(self.chose_ldr == k) < k_indiff", and
172 # despite that evaluating to "a bool < an int", that's legal; e.g.
173 # "False < 4" evaluates to True.
174 # Must be bracketed like this:
175 return self.chose_ldr == (k < k_indiff)
178# =============================================================================
179# KirbyTrial
180# =============================================================================
182class KirbyTrial(GenericTabletRecordMixin, TaskDescendant, Base):
183 __tablename__ = "kirby_mcq_trials"
185 kirby_mcq_id = Column(
186 "kirby_mcq_id", Integer,
187 nullable=False,
188 comment="FK to kirby_mcq"
189 )
190 trial = Column(
191 "trial", Integer,
192 nullable=False,
193 comment="Trial number (1-based)"
194 )
195 sir = Column(
196 "sir", Integer,
197 comment="Small immediate reward"
198 )
199 ldr = Column(
200 "ldr", Integer,
201 comment="Large delayed reward"
202 )
203 delay_days = Column(
204 "delay_days", Integer,
205 comment="Delay in days"
206 )
207 currency = Column(
208 "currency", CurrencyColType,
209 comment="Currency symbol"
210 )
211 currency_symbol_first = BoolColumn(
212 "currency_symbol_first",
213 comment="Does the currency symbol come before the amount?"
214 )
215 chose_ldr = BoolColumn(
216 "chose_ldr",
217 comment="Did the subject choose the large delayed reward?"
218 )
220 def info(self) -> KirbyRewardPair:
221 """
222 Returns the trial information as a :class:`KirbyRewardPair`.
223 """
224 return KirbyRewardPair(
225 sir=self.sir,
226 ldr=self.ldr,
227 delay_days=self.delay_days,
228 chose_ldr=self.chose_ldr,
229 currency=self.currency,
230 currency_symbol_first=self.currency_symbol_first,
231 )
233 def answered(self) -> bool:
234 """
235 Has the subject answered this question?
236 """
237 return self.chose_ldr is not None
239 # -------------------------------------------------------------------------
240 # TaskDescendant overrides
241 # -------------------------------------------------------------------------
243 @classmethod
244 def task_ancestor_class(cls) -> Optional[Type["Task"]]:
245 return Kirby
247 def task_ancestor(self) -> Optional["Kirby"]:
248 return Kirby.get_linked(self.kirby_mcq_id, self)
251# =============================================================================
252# Kirby
253# =============================================================================
255class Kirby(TaskHasPatientMixin, Task):
256 """
257 Server implementation of the Kirby Monetary Choice Questionnaire task.
258 """
259 __tablename__ = "kirby_mcq"
260 shortname = "KirbyMCQ"
262 EXPECTED_N_TRIALS = 27
264 # No fields beyond the basics.
266 # Relationships
267 trials = ancillary_relationship(
268 parent_class_name="Kirby",
269 ancillary_class_name="KirbyTrial",
270 ancillary_fk_to_parent_attr_name="kirby_mcq_id",
271 ancillary_order_by_attr_name="trial"
272 ) # type: List[KirbyTrial]
274 @staticmethod
275 def longname(req: "CamcopsRequest") -> str:
276 _ = req.gettext
277 return _("Kirby et al. 1999 Monetary Choice Questionnaire")
279 def is_complete(self) -> bool:
280 if len(self.trials) != self.EXPECTED_N_TRIALS:
281 return False
282 for t in self.trials:
283 if not t.answered():
284 return False
285 return True
287 def all_choice_results(self) -> List[KirbyRewardPair]:
288 """
289 Returns a list of :class:`KirbyRewardPair` objects, one for each
290 answered question.
291 """
292 results = [] # type: List[KirbyRewardPair]
293 for t in self.trials:
294 if t.answered():
295 results.append(t.info())
296 return results
298 @staticmethod
299 def n_choices_consistent(k: float, results: List[KirbyRewardPair]) -> int:
300 """
301 Returns the number of choices that are consistent with the given k
302 value.
303 """
304 n_consistent = 0
305 for pair in results:
306 if pair.choice_consistent(k):
307 n_consistent += 1
308 return n_consistent
310 def k_kirby(self, results: List[KirbyRewardPair]) -> Optional[float]:
311 """
312 Returns k for a subject as determined using Kirby's (2000) method.
313 See :ref:`kirby_mcq.rst <kirby_mcq>`.
314 """
315 # 1. For every k value assessed by the questions, establish the degree
316 # of consistency.
317 consistency = {} # type: Dict[float, int]
318 for pair in results:
319 k = pair.k_indifference()
320 if k not in consistency:
321 consistency[k] = self.n_choices_consistent(k, results)
322 if not consistency:
323 return None
325 # 2. Restrict to the results that are equally and maximally consistent.
326 max_consistency = max(consistency.values())
327 good_k_values = [k for k, v in consistency.items()
328 if v == max_consistency]
330 # 3. Take the geometric mean of those good k values.
331 # noinspection PyTypeChecker
332 subject_k = gmean(good_k_values) # type: np.float64
334 return float(subject_k)
336 @staticmethod
337 def k_wileyto(results: List[KirbyRewardPair]) -> Optional[float]:
338 """
339 Returns k for a subject as determined using Wileyto et al.'s (2004)
340 method. See :ref:`kirby_mcq.rst <kirby_mcq>`.
341 """
342 if not results:
343 return None
344 n_predictors = 2
345 n_observations = len(results)
346 x = np.zeros((n_observations, n_predictors))
347 y = np.zeros(n_observations)
348 for i in range(n_observations):
349 pair = results[i]
350 a1 = pair.sir
351 a2 = pair.ldr
352 d2 = pair.delay_days
353 predictor1 = 1 - (a2 / a1)
354 predictor2 = d2
355 x[i, 0] = predictor1
356 x[i, 1] = predictor2
357 y[i] = int(pair.chose_ldr) # bool to int
358 lr = sm.Logit(y, x)
359 try:
360 result = lr.fit() # type: BinaryResultsWrapper
361 except (LinAlgError, # e.g. "singular matrix"
362 PerfectSeparationError) as e:
363 log.debug(f"sm.Logit error: {e}")
364 return None
365 coeffs = result.params
366 beta1 = coeffs[0]
367 beta2 = coeffs[1]
368 try:
369 k = beta2 / beta1
370 except ZeroDivisionError:
371 log.warning("Division by zero when calculating k = beta2 / beta1")
372 return None
373 return k
375 # noinspection PyUnusedLocal
376 def get_summaries(self, req: CamcopsRequest) -> List[SummaryElement]:
377 results = self.all_choice_results()
378 return self.standard_task_summary_fields() + [
379 SummaryElement(
380 name="k_kirby", coltype=Float(),
381 value=self.k_kirby(results),
382 comment="k (days^-1, Kirby 2000 method)"),
383 SummaryElement(
384 name="k_wileyto", coltype=Float(),
385 value=self.k_wileyto(results),
386 comment="k (days^-1, Wileyto 2004 method)"),
387 ]
389 def get_task_html(self, req: CamcopsRequest) -> str:
390 dp = 6
391 qlines = [] # type: List[str]
392 for t in self.trials:
393 info = t.info()
394 qlines.append(
395 tr_qa(f"{t.trial}. {info.question(req)} "
396 f"<i>(k<sub>indiff</sub> = "
397 f"{round(info.k_indifference(), dp)})</i>",
398 info.answer(req))
399 )
400 q_a = "\n".join(qlines)
401 results = self.all_choice_results()
402 k_kirby = self.k_kirby(results)
403 if k_kirby is None:
404 inv_k_kirby = None
405 else:
406 inv_k_kirby = int(round(1 / k_kirby)) # round to int
407 # ... you'd think the int() was unnecessary but it is needed
408 k_kirby = round(k_kirby, dp)
409 k_wileyto = self.k_wileyto(results)
410 if k_wileyto is None:
411 inv_k_wileyto = None
412 else:
413 inv_k_wileyto = int(round(1 / k_wileyto)) # round to int
414 k_wileyto = round(k_wileyto, dp)
415 return f"""
416 <div class="{CssClass.SUMMARY}">
417 <table class="{CssClass.SUMMARY}">
418 {self.get_is_complete_tr(req)}
419 <tr>
420 <td><i>k</i> (days<sup>–1</sup>, Kirby 2000 method)</td>
421 <td>{answer(k_kirby)}
422 </tr>
423 <tr>
424 <td>1/<i>k</i> (days, Kirby method): time to half value</td>
425 <td>{answer(inv_k_kirby)}
426 </tr>
427 <tr>
428 <td><i>k</i> (days<sup>–1</sup>, Wileyto et al. 2004 method)</td>
429 <td>{answer(k_wileyto)}
430 </tr>
431 <tr>
432 <td>1/<i>k</i> (days, Wileyto method): time to half value</td>
433 <td>{answer(inv_k_wileyto)}
434 </tr>
435 </table>
436 </div>
437 <table class="{CssClass.TASKDETAIL}">
438 <tr>
439 <th width="75%">Question</th>
440 <th width="25%">Answer</th>
441 </tr>
442 {q_a}
443 </table>
444 """ # noqa