Coverage for tasks/kirby_mcq.py: 35%

172 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/tasks/kirby.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26""" 

27 

28import logging 

29import math 

30from typing import Dict, List, Optional, Type 

31 

32import numpy as np 

33from numpy.linalg.linalg import LinAlgError 

34from scipy.stats.mstats import gmean 

35from sqlalchemy.orm import Mapped, mapped_column 

36from sqlalchemy.sql.sqltypes import Float 

37import statsmodels.api as sm 

38 

39# noinspection PyProtectedMember 

40from statsmodels.discrete.discrete_model import BinaryResultsWrapper 

41from statsmodels.tools.sm_exceptions import PerfectSeparationError 

42 

43from camcops_server.cc_modules.cc_constants import CssClass 

44from camcops_server.cc_modules.cc_db import ( 

45 ancillary_relationship, 

46 GenericTabletRecordMixin, 

47 TaskDescendant, 

48) 

49from camcops_server.cc_modules.cc_html import answer, tr_qa 

50from camcops_server.cc_modules.cc_request import CamcopsRequest 

51from camcops_server.cc_modules.cc_sqlalchemy import Base 

52from camcops_server.cc_modules.cc_sqla_coltypes import ( 

53 CurrencyColType, 

54 mapped_bool_column, 

55) 

56from camcops_server.cc_modules.cc_summaryelement import SummaryElement 

57from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin 

58 

59log = logging.getLogger(__name__) 

60 

61 

62# ============================================================================= 

63# KirbyRewardPair 

64# ============================================================================= 

65 

66 

67class KirbyRewardPair(object): 

68 """ 

69 Represents a pair of rewards: a small immediate reward (SIR) and a large 

70 delayed reward (LDR). 

71 """ 

72 

73 def __init__( 

74 self, 

75 sir: int, 

76 ldr: int, 

77 delay_days: int, 

78 chose_ldr: bool = None, 

79 currency: str = "£", 

80 currency_symbol_first: bool = True, 

81 ) -> None: 

82 """ 

83 Args: 

84 sir: amount of the small immediate reward (SIR) 

85 ldr: amount of the large delayed reward (LDR) 

86 delay_days: delay to the LDR, in days 

87 chose_ldr: if result also represented, did the subject choose the 

88 LDR? 

89 currency: currency symbol 

90 currency_symbol_first: symbol before amount? 

91 """ 

92 self.sir = sir 

93 self.ldr = ldr 

94 self.delay_days = delay_days 

95 self.chose_ldr = chose_ldr 

96 self.currency = currency 

97 self.currency_symbol_first = currency_symbol_first 

98 

99 def money(self, amount: int) -> str: 

100 """ 

101 Returns a currency amount, formatted. 

102 """ 

103 if self.currency_symbol_first: 

104 return f"{self.currency}{amount}" 

105 return f"{amount}{self.currency}" 

106 

107 def sir_string(self, req: CamcopsRequest) -> str: 

108 """ 

109 Returns a string representing the small immediate reward, e.g. 

110 "£10 today". 

111 """ 

112 _ = req.gettext 

113 return _("{money} today").format(money=self.money(self.sir)) 

114 

115 def ldr_string(self, req: CamcopsRequest) -> str: 

116 """ 

117 Returns a string representing the large delayed reward, e.g. 

118 "£50 in 200 days". 

119 """ 

120 _ = req.gettext 

121 return _("{money} in {days} days").format( 

122 money=self.money(self.ldr), days=self.delay_days 

123 ) 

124 

125 def question(self, req: CamcopsRequest) -> str: 

126 """ 

127 The question posed for this reward pair. 

128 """ 

129 _ = req.gettext 

130 return _("Would you prefer {sir}, or {ldr}?").format( 

131 sir=self.sir_string(req), ldr=self.ldr_string(req) 

132 ) 

133 

134 def answer(self, req: CamcopsRequest) -> str: 

135 """ 

136 Returns the subject's answer, or "?". 

137 """ 

138 if self.chose_ldr is None: 

139 return "?" 

140 return self.ldr_string(req) if self.chose_ldr else self.sir_string(req) 

141 

142 def k_indifference(self) -> float: 

143 """ 

144 Returns the value of k, the discounting parameter (units: days ^ -1) 

145 if the subject is indifferent between the two choices. 

146 

147 For calculations see :ref:`kirby_mcq.rst <kirby_mcq>`. 

148 """ 

149 a1 = self.sir 

150 a2 = self.ldr 

151 d2 = self.delay_days 

152 return (a2 - a1) / (a1 * d2) 

153 

154 def choice_consistent(self, k: float) -> bool: 

155 """ 

156 Was the choice consistent with the k value given? 

157 

158 - If no choice has been recorded, returns false. 

159 

160 - If the k value equals the implied indifference point exactly (meaning 

161 that the subject should not care), return true. 

162 """ 

163 if self.chose_ldr is None: 

164 return False 

165 k_indiff = self.k_indifference() 

166 if math.isclose(k, k_indiff): 

167 # Subject is indifferent 

168 return True 

169 # WARNING: "self.chose_ldr == k < k_indiff" FAILS. 

170 # Python will evaluate this to "(self.chose_ldr == k) < k_indiff", and 

171 # despite that evaluating to "a bool < an int", that's legal; e.g. 

172 # "False < 4" evaluates to True. 

173 # Must be bracketed like this: 

174 return self.chose_ldr == (k < k_indiff) 

175 

176 

177# ============================================================================= 

178# KirbyTrial 

179# ============================================================================= 

180 

181 

182class KirbyTrial(GenericTabletRecordMixin, TaskDescendant, Base): 

183 __tablename__ = "kirby_mcq_trials" 

184 

185 kirby_mcq_id: Mapped[int] = mapped_column(comment="FK to kirby_mcq") 

186 trial: Mapped[int] = mapped_column(comment="Trial number (1-based)") 

187 sir: Mapped[Optional[int]] = mapped_column( 

188 comment="Small immediate reward" 

189 ) 

190 ldr: Mapped[Optional[int]] = mapped_column(comment="Large delayed reward") 

191 delay_days: Mapped[Optional[int]] = mapped_column(comment="Delay in days") 

192 currency: Mapped[Optional[str]] = mapped_column( 

193 CurrencyColType, comment="Currency symbol" 

194 ) 

195 currency_symbol_first: Mapped[Optional[bool]] = mapped_bool_column( 

196 "currency_symbol_first", 

197 comment="Does the currency symbol come before the amount?", 

198 ) 

199 chose_ldr: Mapped[Optional[bool]] = mapped_bool_column( 

200 "chose_ldr", comment="Did the subject choose the large delayed reward?" 

201 ) 

202 

203 def info(self) -> KirbyRewardPair: 

204 """ 

205 Returns the trial information as a :class:`KirbyRewardPair`. 

206 """ 

207 return KirbyRewardPair( 

208 sir=self.sir, 

209 ldr=self.ldr, 

210 delay_days=self.delay_days, 

211 chose_ldr=self.chose_ldr, 

212 currency=self.currency, 

213 currency_symbol_first=self.currency_symbol_first, 

214 ) 

215 

216 def answered(self) -> bool: 

217 """ 

218 Has the subject answered this question? 

219 """ 

220 return self.chose_ldr is not None 

221 

222 # ------------------------------------------------------------------------- 

223 # TaskDescendant overrides 

224 # ------------------------------------------------------------------------- 

225 

226 @classmethod 

227 def task_ancestor_class(cls) -> Optional[Type["Task"]]: 

228 return Kirby 

229 

230 def task_ancestor(self) -> Optional["Kirby"]: 

231 return Kirby.get_linked( 

232 self.kirby_mcq_id, self 

233 ) # type: ignore[return-value] 

234 

235 

236# ============================================================================= 

237# Kirby 

238# ============================================================================= 

239 

240 

241class Kirby(TaskHasPatientMixin, Task): # type: ignore[misc] 

242 """ 

243 Server implementation of the Kirby Monetary Choice Questionnaire task. 

244 """ 

245 

246 __tablename__ = "kirby_mcq" 

247 shortname = "KirbyMCQ" 

248 

249 EXPECTED_N_TRIALS = 27 

250 

251 # No fields beyond the basics. 

252 

253 # Relationships 

254 trials = ancillary_relationship( # type: ignore[assignment] 

255 parent_class_name="Kirby", 

256 ancillary_class_name="KirbyTrial", 

257 ancillary_fk_to_parent_attr_name="kirby_mcq_id", 

258 ancillary_order_by_attr_name="trial", 

259 ) # type: List[KirbyTrial] 

260 

261 @staticmethod 

262 def longname(req: "CamcopsRequest") -> str: 

263 _ = req.gettext 

264 return _("Kirby et al. 1999 Monetary Choice Questionnaire") 

265 

266 def is_complete(self) -> bool: 

267 if len(self.trials) != self.EXPECTED_N_TRIALS: 

268 return False 

269 for t in self.trials: 

270 if not t.answered(): 

271 return False 

272 return True 

273 

274 def all_choice_results(self) -> List[KirbyRewardPair]: 

275 """ 

276 Returns a list of :class:`KirbyRewardPair` objects, one for each 

277 answered question. 

278 """ 

279 results = [] # type: List[KirbyRewardPair] 

280 for t in self.trials: 

281 if t.answered(): 

282 results.append(t.info()) 

283 return results 

284 

285 @staticmethod 

286 def n_choices_consistent(k: float, results: List[KirbyRewardPair]) -> int: 

287 """ 

288 Returns the number of choices that are consistent with the given k 

289 value. 

290 """ 

291 n_consistent = 0 

292 for pair in results: 

293 if pair.choice_consistent(k): 

294 n_consistent += 1 

295 return n_consistent 

296 

297 def k_kirby(self, results: List[KirbyRewardPair]) -> Optional[float]: 

298 """ 

299 Returns k for a subject as determined using Kirby's (2000) method. 

300 See :ref:`kirby_mcq.rst <kirby_mcq>`. 

301 """ 

302 # 1. For every k value assessed by the questions, establish the degree 

303 # of consistency. 

304 consistency = {} # type: Dict[float, int] 

305 for pair in results: 

306 k = pair.k_indifference() 

307 if k not in consistency: 

308 consistency[k] = self.n_choices_consistent(k, results) 

309 if not consistency: 

310 return None 

311 

312 # 2. Restrict to the results that are equally and maximally consistent. 

313 max_consistency = max(consistency.values()) 

314 good_k_values = [ 

315 k for k, v in consistency.items() if v == max_consistency 

316 ] 

317 

318 # 3. Take the geometric mean of those good k values. 

319 # noinspection PyTypeChecker 

320 subject_k = gmean(good_k_values) # type: np.float64 

321 

322 return float(subject_k) 

323 

324 @staticmethod 

325 def k_wileyto(results: List[KirbyRewardPair]) -> Optional[float]: 

326 """ 

327 Returns k for a subject as determined using Wileyto et al.'s (2004) 

328 method. See :ref:`kirby_mcq.rst <kirby_mcq>`. 

329 """ 

330 if not results: 

331 return None 

332 n_predictors = 2 

333 n_observations = len(results) 

334 x = np.zeros((n_observations, n_predictors)) 

335 y = np.zeros(n_observations) 

336 for i in range(n_observations): 

337 pair = results[i] 

338 a1 = pair.sir 

339 a2 = pair.ldr 

340 d2 = pair.delay_days 

341 predictor1 = 1 - (a2 / a1) 

342 predictor2 = d2 

343 x[i, 0] = predictor1 

344 x[i, 1] = predictor2 

345 y[i] = int(pair.chose_ldr) # bool to int 

346 lr = sm.Logit(y, x) 

347 try: 

348 result = lr.fit() # type: BinaryResultsWrapper 

349 except ( 

350 LinAlgError, # e.g. "singular matrix" 

351 PerfectSeparationError, 

352 ) as e: 

353 log.debug(f"sm.Logit error: {e}") 

354 return None 

355 coeffs = result.params 

356 beta1 = coeffs[0] 

357 beta2 = coeffs[1] 

358 try: 

359 k = beta2 / beta1 

360 except ZeroDivisionError: 

361 log.warning("Division by zero when calculating k = beta2 / beta1") 

362 return None 

363 return k 

364 

365 # noinspection PyUnusedLocal 

366 def get_summaries(self, req: CamcopsRequest) -> List[SummaryElement]: 

367 results = self.all_choice_results() 

368 return self.standard_task_summary_fields() + [ 

369 SummaryElement( 

370 name="k_kirby", 

371 coltype=Float(), 

372 value=self.k_kirby(results), 

373 comment="k (days^-1, Kirby 2000 method)", 

374 ), 

375 SummaryElement( 

376 name="k_wileyto", 

377 coltype=Float(), 

378 value=self.k_wileyto(results), 

379 comment="k (days^-1, Wileyto 2004 method)", 

380 ), 

381 ] 

382 

383 def get_task_html(self, req: CamcopsRequest) -> str: 

384 dp = 6 

385 qlines = [] # type: List[str] 

386 for t in self.trials: 

387 info = t.info() 

388 qlines.append( 

389 tr_qa( 

390 f"{t.trial}. {info.question(req)} " 

391 f"<i>(k<sub>indiff</sub> = " 

392 f"{round(info.k_indifference(), dp)})</i>", 

393 info.answer(req), 

394 ) 

395 ) 

396 q_a = "\n".join(qlines) 

397 results = self.all_choice_results() 

398 k_kirby = self.k_kirby(results) 

399 if k_kirby is None: 

400 inv_k_kirby = None 

401 else: 

402 inv_k_kirby = int(round(1 / k_kirby)) # round to int 

403 # ... you'd think the int() was unnecessary but it is needed 

404 k_kirby = round(k_kirby, dp) 

405 k_wileyto = self.k_wileyto(results) 

406 if k_wileyto is None: 

407 inv_k_wileyto = None 

408 else: 

409 inv_k_wileyto = int(round(1 / k_wileyto)) # round to int 

410 k_wileyto = round(k_wileyto, dp) 

411 return f""" 

412 <div class="{CssClass.SUMMARY}"> 

413 <table class="{CssClass.SUMMARY}"> 

414 {self.get_is_complete_tr(req)} 

415 <tr> 

416 <td><i>k</i> (days<sup>–1</sup>, Kirby 2000 method)</td> 

417 <td>{answer(k_kirby)} 

418 </tr> 

419 <tr> 

420 <td>1/<i>k</i> (days, Kirby method): time to half value</td> 

421 <td>{answer(inv_k_kirby)} 

422 </tr> 

423 <tr> 

424 <td><i>k</i> (days<sup>–1</sup>, Wileyto et al. 2004 method)</td> 

425 <td>{answer(k_wileyto)} 

426 </tr> 

427 <tr> 

428 <td>1/<i>k</i> (days, Wileyto method): time to half value</td> 

429 <td>{answer(inv_k_wileyto)} 

430 </tr> 

431 </table> 

432 </div> 

433 <table class="{CssClass.TASKDETAIL}"> 

434 <tr> 

435 <th width="75%">Question</th> 

436 <th width="25%">Answer</th> 

437 </tr> 

438 {q_a} 

439 </table> 

440 """ # noqa