Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/tasks/kirby.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27""" 

28 

29import logging 

30import math 

31from typing import Dict, List, Optional, Type 

32 

33import numpy as np 

34from numpy.linalg.linalg import LinAlgError 

35from scipy.stats.mstats import gmean 

36from sqlalchemy.sql.schema import Column 

37from sqlalchemy.sql.sqltypes import Float, Integer 

38import statsmodels.api as sm 

39# noinspection PyProtectedMember 

40from statsmodels.discrete.discrete_model import BinaryResultsWrapper 

41from statsmodels.tools.sm_exceptions import PerfectSeparationError 

42 

43from camcops_server.cc_modules.cc_constants import CssClass 

44from camcops_server.cc_modules.cc_db import ( 

45 ancillary_relationship, 

46 GenericTabletRecordMixin, 

47 TaskDescendant, 

48) 

49from camcops_server.cc_modules.cc_html import ( 

50 answer, 

51 tr_qa, 

52) 

53from camcops_server.cc_modules.cc_request import CamcopsRequest 

54from camcops_server.cc_modules.cc_sqlalchemy import Base 

55from camcops_server.cc_modules.cc_sqla_coltypes import ( 

56 BoolColumn, 

57 CurrencyColType, 

58) 

59from camcops_server.cc_modules.cc_summaryelement import SummaryElement 

60from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin 

61 

62log = logging.getLogger(__name__) 

63 

64 

65# ============================================================================= 

66# KirbyRewardPair 

67# ============================================================================= 

68 

69class KirbyRewardPair(object): 

70 """ 

71 Represents a pair of rewards: a small immediate reward (SIR) and a large 

72 delayed reward (LDR). 

73 """ 

74 def __init__(self, 

75 sir: int, 

76 ldr: int, 

77 delay_days: int, 

78 chose_ldr: bool = None, 

79 currency: str = "£", 

80 currency_symbol_first: bool = True) -> None: 

81 """ 

82 Args: 

83 sir: amount of the small immediate reward (SIR) 

84 ldr: amount of the large delayed reward (LDR) 

85 delay_days: delay to the LDR, in days 

86 chose_ldr: if result also represented, did the subject choose the 

87 LDR? 

88 currency: currency symbol 

89 currency_symbol_first: symbol before amount? 

90 """ 

91 self.sir = sir 

92 self.ldr = ldr 

93 self.delay_days = delay_days 

94 self.chose_ldr = chose_ldr 

95 self.currency = currency 

96 self.currency_symbol_first = currency_symbol_first 

97 

98 def money(self, amount: int) -> str: 

99 """ 

100 Returns a currency amount, formatted. 

101 """ 

102 if self.currency_symbol_first: 

103 return f"{self.currency}{amount}" 

104 return f"{amount}{self.currency}" 

105 

106 def sir_string(self, req: CamcopsRequest) -> str: 

107 """ 

108 Returns a string representing the small immediate reward, e.g. 

109 "£10 today". 

110 """ 

111 _ = req.gettext 

112 return _("{money} today").format(money=self.money(self.sir)) 

113 

114 def ldr_string(self, req: CamcopsRequest) -> str: 

115 """ 

116 Returns a string representing the large delayed reward, e.g. 

117 "£50 in 200 days". 

118 """ 

119 _ = req.gettext 

120 return _("{money} in {days} days").format( 

121 money=self.money(self.ldr), 

122 days=self.delay_days, 

123 ) 

124 

125 def question(self, req: CamcopsRequest) -> str: 

126 """ 

127 The question posed for this reward pair. 

128 """ 

129 _ = req.gettext 

130 return _("Would you prefer {sir}, or {ldr}?").format( 

131 sir=self.sir_string(req), 

132 ldr=self.ldr_string(req), 

133 ) 

134 

135 def answer(self, req: CamcopsRequest) -> str: 

136 """ 

137 Returns the subject's answer, or "?". 

138 """ 

139 if self.chose_ldr is None: 

140 return "?" 

141 return self.ldr_string(req) if self.chose_ldr else self.sir_string(req) 

142 

143 def k_indifference(self) -> float: 

144 """ 

145 Returns the value of k, the discounting parameter (units: days ^ -1) 

146 if the subject is indifferent between the two choices. 

147 

148 For calculations see :ref:`kirby_mcq.rst <kirby_mcq>`. 

149 """ 

150 a1 = self.sir 

151 a2 = self.ldr 

152 d2 = self.delay_days 

153 return (a2 - a1) / (a1 * d2) 

154 

155 def choice_consistent(self, k: float) -> bool: 

156 """ 

157 Was the choice consistent with the k value given? 

158 

159 - If no choice has been recorded, returns false. 

160 

161 - If the k value equals the implied indifference point exactly (meaning 

162 that the subject should not care), return true. 

163 """ 

164 if self.chose_ldr is None: 

165 return False 

166 k_indiff = self.k_indifference() 

167 if math.isclose(k, k_indiff): 

168 # Subject is indifferent 

169 return True 

170 # WARNING: "self.chose_ldr == k < k_indiff" FAILS. 

171 # Python will evaluate this to "(self.chose_ldr == k) < k_indiff", and 

172 # despite that evaluating to "a bool < an int", that's legal; e.g. 

173 # "False < 4" evaluates to True. 

174 # Must be bracketed like this: 

175 return self.chose_ldr == (k < k_indiff) 

176 

177 

178# ============================================================================= 

179# KirbyTrial 

180# ============================================================================= 

181 

182class KirbyTrial(GenericTabletRecordMixin, TaskDescendant, Base): 

183 __tablename__ = "kirby_mcq_trials" 

184 

185 kirby_mcq_id = Column( 

186 "kirby_mcq_id", Integer, 

187 nullable=False, 

188 comment="FK to kirby_mcq" 

189 ) 

190 trial = Column( 

191 "trial", Integer, 

192 nullable=False, 

193 comment="Trial number (1-based)" 

194 ) 

195 sir = Column( 

196 "sir", Integer, 

197 comment="Small immediate reward" 

198 ) 

199 ldr = Column( 

200 "ldr", Integer, 

201 comment="Large delayed reward" 

202 ) 

203 delay_days = Column( 

204 "delay_days", Integer, 

205 comment="Delay in days" 

206 ) 

207 currency = Column( 

208 "currency", CurrencyColType, 

209 comment="Currency symbol" 

210 ) 

211 currency_symbol_first = BoolColumn( 

212 "currency_symbol_first", 

213 comment="Does the currency symbol come before the amount?" 

214 ) 

215 chose_ldr = BoolColumn( 

216 "chose_ldr", 

217 comment="Did the subject choose the large delayed reward?" 

218 ) 

219 

220 def info(self) -> KirbyRewardPair: 

221 """ 

222 Returns the trial information as a :class:`KirbyRewardPair`. 

223 """ 

224 return KirbyRewardPair( 

225 sir=self.sir, 

226 ldr=self.ldr, 

227 delay_days=self.delay_days, 

228 chose_ldr=self.chose_ldr, 

229 currency=self.currency, 

230 currency_symbol_first=self.currency_symbol_first, 

231 ) 

232 

233 def answered(self) -> bool: 

234 """ 

235 Has the subject answered this question? 

236 """ 

237 return self.chose_ldr is not None 

238 

239 # ------------------------------------------------------------------------- 

240 # TaskDescendant overrides 

241 # ------------------------------------------------------------------------- 

242 

243 @classmethod 

244 def task_ancestor_class(cls) -> Optional[Type["Task"]]: 

245 return Kirby 

246 

247 def task_ancestor(self) -> Optional["Kirby"]: 

248 return Kirby.get_linked(self.kirby_mcq_id, self) 

249 

250 

251# ============================================================================= 

252# Kirby 

253# ============================================================================= 

254 

255class Kirby(TaskHasPatientMixin, Task): 

256 """ 

257 Server implementation of the Kirby Monetary Choice Questionnaire task. 

258 """ 

259 __tablename__ = "kirby_mcq" 

260 shortname = "KirbyMCQ" 

261 

262 EXPECTED_N_TRIALS = 27 

263 

264 # No fields beyond the basics. 

265 

266 # Relationships 

267 trials = ancillary_relationship( 

268 parent_class_name="Kirby", 

269 ancillary_class_name="KirbyTrial", 

270 ancillary_fk_to_parent_attr_name="kirby_mcq_id", 

271 ancillary_order_by_attr_name="trial" 

272 ) # type: List[KirbyTrial] 

273 

274 @staticmethod 

275 def longname(req: "CamcopsRequest") -> str: 

276 _ = req.gettext 

277 return _("Kirby et al. 1999 Monetary Choice Questionnaire") 

278 

279 def is_complete(self) -> bool: 

280 if len(self.trials) != self.EXPECTED_N_TRIALS: 

281 return False 

282 for t in self.trials: 

283 if not t.answered(): 

284 return False 

285 return True 

286 

287 def all_choice_results(self) -> List[KirbyRewardPair]: 

288 """ 

289 Returns a list of :class:`KirbyRewardPair` objects, one for each 

290 answered question. 

291 """ 

292 results = [] # type: List[KirbyRewardPair] 

293 for t in self.trials: 

294 if t.answered(): 

295 results.append(t.info()) 

296 return results 

297 

298 @staticmethod 

299 def n_choices_consistent(k: float, results: List[KirbyRewardPair]) -> int: 

300 """ 

301 Returns the number of choices that are consistent with the given k 

302 value. 

303 """ 

304 n_consistent = 0 

305 for pair in results: 

306 if pair.choice_consistent(k): 

307 n_consistent += 1 

308 return n_consistent 

309 

310 def k_kirby(self, results: List[KirbyRewardPair]) -> Optional[float]: 

311 """ 

312 Returns k for a subject as determined using Kirby's (2000) method. 

313 See :ref:`kirby_mcq.rst <kirby_mcq>`. 

314 """ 

315 # 1. For every k value assessed by the questions, establish the degree 

316 # of consistency. 

317 consistency = {} # type: Dict[float, int] 

318 for pair in results: 

319 k = pair.k_indifference() 

320 if k not in consistency: 

321 consistency[k] = self.n_choices_consistent(k, results) 

322 if not consistency: 

323 return None 

324 

325 # 2. Restrict to the results that are equally and maximally consistent. 

326 max_consistency = max(consistency.values()) 

327 good_k_values = [k for k, v in consistency.items() 

328 if v == max_consistency] 

329 

330 # 3. Take the geometric mean of those good k values. 

331 # noinspection PyTypeChecker 

332 subject_k = gmean(good_k_values) # type: np.float64 

333 

334 return float(subject_k) 

335 

336 @staticmethod 

337 def k_wileyto(results: List[KirbyRewardPair]) -> Optional[float]: 

338 """ 

339 Returns k for a subject as determined using Wileyto et al.'s (2004) 

340 method. See :ref:`kirby_mcq.rst <kirby_mcq>`. 

341 """ 

342 if not results: 

343 return None 

344 n_predictors = 2 

345 n_observations = len(results) 

346 x = np.zeros((n_observations, n_predictors)) 

347 y = np.zeros(n_observations) 

348 for i in range(n_observations): 

349 pair = results[i] 

350 a1 = pair.sir 

351 a2 = pair.ldr 

352 d2 = pair.delay_days 

353 predictor1 = 1 - (a2 / a1) 

354 predictor2 = d2 

355 x[i, 0] = predictor1 

356 x[i, 1] = predictor2 

357 y[i] = int(pair.chose_ldr) # bool to int 

358 lr = sm.Logit(y, x) 

359 try: 

360 result = lr.fit() # type: BinaryResultsWrapper 

361 except (LinAlgError, # e.g. "singular matrix" 

362 PerfectSeparationError) as e: 

363 log.debug(f"sm.Logit error: {e}") 

364 return None 

365 coeffs = result.params 

366 beta1 = coeffs[0] 

367 beta2 = coeffs[1] 

368 try: 

369 k = beta2 / beta1 

370 except ZeroDivisionError: 

371 log.warning("Division by zero when calculating k = beta2 / beta1") 

372 return None 

373 return k 

374 

375 # noinspection PyUnusedLocal 

376 def get_summaries(self, req: CamcopsRequest) -> List[SummaryElement]: 

377 results = self.all_choice_results() 

378 return self.standard_task_summary_fields() + [ 

379 SummaryElement( 

380 name="k_kirby", coltype=Float(), 

381 value=self.k_kirby(results), 

382 comment="k (days^-1, Kirby 2000 method)"), 

383 SummaryElement( 

384 name="k_wileyto", coltype=Float(), 

385 value=self.k_wileyto(results), 

386 comment="k (days^-1, Wileyto 2004 method)"), 

387 ] 

388 

389 def get_task_html(self, req: CamcopsRequest) -> str: 

390 dp = 6 

391 qlines = [] # type: List[str] 

392 for t in self.trials: 

393 info = t.info() 

394 qlines.append( 

395 tr_qa(f"{t.trial}. {info.question(req)} " 

396 f"<i>(k<sub>indiff</sub> = " 

397 f"{round(info.k_indifference(), dp)})</i>", 

398 info.answer(req)) 

399 ) 

400 q_a = "\n".join(qlines) 

401 results = self.all_choice_results() 

402 k_kirby = self.k_kirby(results) 

403 if k_kirby is None: 

404 inv_k_kirby = None 

405 else: 

406 inv_k_kirby = int(round(1 / k_kirby)) # round to int 

407 # ... you'd think the int() was unnecessary but it is needed 

408 k_kirby = round(k_kirby, dp) 

409 k_wileyto = self.k_wileyto(results) 

410 if k_wileyto is None: 

411 inv_k_wileyto = None 

412 else: 

413 inv_k_wileyto = int(round(1 / k_wileyto)) # round to int 

414 k_wileyto = round(k_wileyto, dp) 

415 return f""" 

416 <div class="{CssClass.SUMMARY}"> 

417 <table class="{CssClass.SUMMARY}"> 

418 {self.get_is_complete_tr(req)} 

419 <tr> 

420 <td><i>k</i> (days<sup>–1</sup>, Kirby 2000 method)</td> 

421 <td>{answer(k_kirby)} 

422 </tr> 

423 <tr> 

424 <td>1/<i>k</i> (days, Kirby method): time to half value</td> 

425 <td>{answer(inv_k_kirby)} 

426 </tr> 

427 <tr> 

428 <td><i>k</i> (days<sup>–1</sup>, Wileyto et al. 2004 method)</td> 

429 <td>{answer(k_wileyto)} 

430 </tr> 

431 <tr> 

432 <td>1/<i>k</i> (days, Wileyto method): time to half value</td> 

433 <td>{answer(inv_k_wileyto)} 

434 </tr> 

435 </table> 

436 </div> 

437 <table class="{CssClass.TASKDETAIL}"> 

438 <tr> 

439 <th width="75%">Question</th> 

440 <th width="25%">Answer</th> 

441 </tr> 

442 {q_a} 

443 </table> 

444 """ # noqa