Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/cc_modules/cc_user.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26 

27**CamCOPS users.** 

28 

29""" 

30 

31import datetime 

32import logging 

33import re 

34from typing import List, Optional, Set, Tuple, TYPE_CHECKING 

35 

36import cardinal_pythonlib.crypto as rnc_crypto 

37from cardinal_pythonlib.datetimefunc import convert_datetime_to_local 

38from cardinal_pythonlib.logs import BraceStyleAdapter 

39from cardinal_pythonlib.reprfunc import simple_repr 

40from cardinal_pythonlib.sqlalchemy.orm_query import ( 

41 CountStarSpecializedQuery, 

42 exists_orm, 

43) 

44from pendulum import DateTime as Pendulum 

45from sqlalchemy.ext.associationproxy import association_proxy 

46from sqlalchemy.orm import relationship, Session as SqlASession, Query 

47from sqlalchemy.sql import false 

48from sqlalchemy.sql.expression import and_, exists, not_ 

49from sqlalchemy.sql.functions import func 

50from sqlalchemy.sql.schema import Column, ForeignKey 

51from sqlalchemy.sql.sqltypes import Boolean, DateTime, Integer 

52 

53from camcops_server.cc_modules.cc_audit import audit 

54from camcops_server.cc_modules.cc_constants import USER_NAME_FOR_SYSTEM 

55from camcops_server.cc_modules.cc_group import Group 

56from camcops_server.cc_modules.cc_membership import UserGroupMembership 

57from camcops_server.cc_modules.cc_sqla_coltypes import ( 

58 EmailAddressColType, 

59 FullNameColType, 

60 HashedPasswordColType, 

61 LanguageCodeColType, 

62 PendulumDateTimeAsIsoTextColType, 

63 UserNameCamcopsColType, 

64) 

65from camcops_server.cc_modules.cc_sqlalchemy import Base 

66from camcops_server.cc_modules.cc_text import TERMS_CONDITIONS_UPDATE_DATE 

67 

68if TYPE_CHECKING: 

69 from camcops_server.cc_modules.cc_patient import Patient 

70 from camcops_server.cc_modules.cc_request import CamcopsRequest 

71 

72log = BraceStyleAdapter(logging.getLogger(__name__)) 

73 

74 

75# ============================================================================= 

76# Constants 

77# ============================================================================= 

78 

79VALID_USERNAME_REGEX = "^[A-Za-z0-9_-]+$" 

80BCRYPT_DEFAULT_LOG_ROUNDS = 6 

81# Default is 12, but it does impact on the tablet upload speed (cost per 

82# transaction). Time is expected to be proportional to 2^n, i.e. incrementing 1 

83# increases time by a factor of 2. 

84# Empirically, on egret: 

85# 2^12 rounds takes around 400 ms 

86# 2^8 rounds takes around 30 ms (as expected, 1/16 of the time as for 12) 

87# we'd like around 8 ms; http://security.stackexchange.com/questions/17207 

88# ... so we should be using 12 + log(8/400)/log(2) = 6 rounds 

89 

90CLEAR_DUMMY_LOGIN_FREQUENCY_DAYS = 7 

91CLEAR_DUMMY_LOGIN_PERIOD = datetime.timedelta( 

92 days=CLEAR_DUMMY_LOGIN_FREQUENCY_DAYS) 

93 

94 

95# ============================================================================= 

96# SecurityAccountLockout 

97# ============================================================================= 

98# Note that we record login failures for non-existent users, and pretend 

99# they're locked out (to prevent username discovery that way, by timing) 

100 

101class SecurityAccountLockout(Base): 

102 """ 

103 Represents an account "lockout". 

104 """ 

105 __tablename__ = "_security_account_lockouts" 

106 

107 id = Column("id", Integer, primary_key=True, autoincrement=True) 

108 username = Column( 

109 "username", UserNameCamcopsColType, 

110 nullable=False, index=True, 

111 comment="User name (which may be a non-existent user, to prevent " 

112 "subtle username discovery by careful timing)" 

113 ) 

114 locked_until = Column( 

115 "locked_until", DateTime, 

116 nullable=False, index=True, 

117 comment="Account is locked until (UTC)" 

118 ) 

119 

120 @classmethod 

121 def delete_old_account_lockouts(cls, req: "CamcopsRequest") -> None: 

122 """ 

123 Delete all expired account lockouts. 

124 """ 

125 dbsession = req.dbsession 

126 now = req.now_utc 

127 dbsession.query(cls)\ 

128 .filter(cls.locked_until <= now)\ 

129 .delete(synchronize_session=False) 

130 

131 @classmethod 

132 def is_user_locked_out(cls, req: "CamcopsRequest", username: str) -> bool: 

133 """ 

134 Is the specified user locked out? 

135 

136 Args: 

137 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

138 username: the user's username 

139 """ 

140 dbsession = req.dbsession 

141 now = req.now_utc 

142 return exists_orm(dbsession, cls, 

143 cls.username == username, 

144 cls.locked_until > now) 

145 

146 @classmethod 

147 def user_locked_out_until(cls, req: "CamcopsRequest", 

148 username: str) -> Optional[Pendulum]: 

149 """ 

150 When is the user locked out until? 

151 

152 Args: 

153 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

154 username: the user's username 

155 

156 Returns: 

157 Pendulum datetime in local timezone (or ``None`` if not 

158 locked out). 

159 """ 

160 dbsession = req.dbsession 

161 now = req.now_utc 

162 locked_until_utc = dbsession.query(func.max(cls.locked_until))\ 

163 .filter(cls.username == username)\ 

164 .filter(cls.locked_until > now)\ 

165 .scalar() # type: Optional[Pendulum] 

166 # ... NOT first(), which returns (result,); we want just result 

167 if not locked_until_utc: 

168 return None 

169 return convert_datetime_to_local(locked_until_utc) 

170 

171 @classmethod 

172 def lock_user_out(cls, req: "CamcopsRequest", 

173 username: str, lockout_minutes: int) -> None: 

174 """ 

175 Lock user out for a specified number of minutes. 

176 

177 Args: 

178 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

179 username: the user's username 

180 lockout_minutes: number of minutes 

181 """ 

182 dbsession = req.dbsession 

183 now = req.now_utc 

184 lock_until = now + datetime.timedelta(minutes=lockout_minutes) 

185 # noinspection PyArgumentList 

186 lock = cls(username=username, locked_until=lock_until) 

187 dbsession.add(lock) 

188 audit(req, 

189 f"Account {username} locked out for {lockout_minutes} minutes") 

190 

191 @classmethod 

192 def unlock_user(cls, req: "CamcopsRequest", username: str) -> None: 

193 """ 

194 Unlock a user. 

195 

196 Args: 

197 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

198 username: the user's username 

199 """ 

200 dbsession = req.dbsession 

201 dbsession.query(cls)\ 

202 .filter(cls.username == username)\ 

203 .delete(synchronize_session=False) 

204 

205 

206# ============================================================================= 

207# SecurityLoginFailure 

208# ============================================================================= 

209 

210class SecurityLoginFailure(Base): 

211 """ 

212 Represents a record of a failed login. 

213 

214 Too many failed logins lead to a lockout; see 

215 :class:`SecurityAccountLockout`. 

216 """ 

217 __tablename__ = "_security_login_failures" 

218 

219 id = Column("id", Integer, primary_key=True, autoincrement=True) 

220 username = Column( 

221 "username", UserNameCamcopsColType, 

222 nullable=False, index=True, 

223 comment="User name (which may be a non-existent user, to prevent " 

224 "subtle username discovery by careful timing)" 

225 ) 

226 login_failure_at = Column( 

227 "login_failure_at", DateTime, 

228 nullable=False, index=True, 

229 comment="Login failure occurred at (UTC)" 

230 ) 

231 

232 @classmethod 

233 def record_login_failure(cls, req: "CamcopsRequest", 

234 username: str) -> None: 

235 """ 

236 Record that a user has failed to log in. 

237 

238 Args: 

239 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

240 username: the user's username 

241 """ 

242 dbsession = req.dbsession 

243 now = req.now_utc 

244 # noinspection PyArgumentList 

245 failure = cls(username=username, login_failure_at=now) 

246 dbsession.add(failure) 

247 

248 @classmethod 

249 def act_on_login_failure(cls, req: "CamcopsRequest", 

250 username: str) -> None: 

251 """ 

252 Record login failure and lock out user if necessary. 

253 

254 Args: 

255 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

256 username: the user's username 

257 """ 

258 cfg = req.config 

259 audit(req, f"Failed login as user: {username}") 

260 cls.record_login_failure(req, username) 

261 nfailures = cls.how_many_login_failures(req, username) 

262 nlockouts = nfailures // cfg.lockout_threshold 

263 nfailures_since_last_lockout = nfailures % cfg.lockout_threshold 

264 if nlockouts >= 1 and nfailures_since_last_lockout == 0: 

265 # new lockout required 

266 lockout_minutes = nlockouts * \ 

267 cfg.lockout_duration_increment_minutes 

268 SecurityAccountLockout.lock_user_out(req, username, 

269 lockout_minutes) 

270 

271 @classmethod 

272 def clear_login_failures(cls, req: "CamcopsRequest", 

273 username: str) -> None: 

274 """ 

275 Clear login failures for a user. 

276 

277 Args: 

278 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

279 username: the user's username 

280 """ 

281 dbsession = req.dbsession 

282 dbsession.query(cls)\ 

283 .filter(cls.username == username)\ 

284 .delete(synchronize_session=False) 

285 

286 @classmethod 

287 def how_many_login_failures(cls, req: "CamcopsRequest", 

288 username: str) -> int: 

289 """ 

290 How many times has the user tried and failed to log in (recently)? 

291 

292 Args: 

293 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

294 username: the user's username 

295 """ 

296 dbsession = req.dbsession 

297 q = CountStarSpecializedQuery([cls], session=dbsession)\ 

298 .filter(cls.username == username) 

299 return q.count_star() 

300 

301 @classmethod 

302 def enable_user(cls, req: "CamcopsRequest", username: str) -> None: 

303 """ 

304 Unlock user and clear login failures. 

305 

306 Args: 

307 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

308 username: the user's username 

309 """ 

310 SecurityAccountLockout.unlock_user(req, username) 

311 cls.clear_login_failures(req, username) 

312 audit(req, f"User {username} re-enabled") 

313 

314 @classmethod 

315 def clear_login_failures_for_nonexistent_users( 

316 cls, req: "CamcopsRequest") -> None: 

317 """ 

318 Clear login failures for nonexistent users. 

319 

320 Login failues are recorded for nonexistent users to mimic the lockout 

321 seen for real users, i.e. to reduce the potential for username 

322 discovery. 

323 

324 Args: 

325 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

326 """ 

327 dbsession = req.dbsession 

328 all_user_names = dbsession.query(User.username) 

329 dbsession.query(cls)\ 

330 .filter(cls.username.notin_(all_user_names))\ 

331 .delete(synchronize_session=False) 

332 # https://stackoverflow.com/questions/26182027/how-to-use-not-in-clause-in-sqlalchemy-orm-query # noqa 

333 

334 @classmethod 

335 def clear_dummy_login_failures_if_necessary(cls, 

336 req: "CamcopsRequest") -> None: 

337 """ 

338 Clear dummy login failures if we haven't done so for a while. 

339 

340 Not too often! See :data:`CLEAR_DUMMY_LOGIN_PERIOD`. 

341 

342 Args: 

343 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

344 """ 

345 now = req.now_utc 

346 ss = req.server_settings 

347 last_dummy_login_failure_clearance = ss.get_last_dummy_login_failure_clearance_pendulum() # noqa 

348 if last_dummy_login_failure_clearance is not None: 

349 elapsed = now - last_dummy_login_failure_clearance 

350 if elapsed < CLEAR_DUMMY_LOGIN_PERIOD: 

351 # We cleared it recently. 

352 return 

353 

354 cls.clear_login_failures_for_nonexistent_users(req) 

355 log.debug("Dummy login failures cleared.") 

356 ss.last_dummy_login_failure_clearance_at_utc = now 

357 

358 

359# ============================================================================= 

360# User class 

361# ============================================================================= 

362 

363class User(Base): 

364 """ 

365 Class representing a user. 

366 """ 

367 __tablename__ = "_security_users" 

368 

369 # ------------------------------------------------------------------------- 

370 # Columns 

371 # ------------------------------------------------------------------------- 

372 

373 id = Column( 

374 "id", Integer, 

375 primary_key=True, autoincrement=True, index=True, 

376 comment="User ID" 

377 ) 

378 username = Column( 

379 "username", UserNameCamcopsColType, 

380 nullable=False, index=True, unique=True, 

381 comment="User name" 

382 ) # type: str 

383 fullname = Column( 

384 "fullname", FullNameColType, 

385 comment="User's full name" 

386 ) 

387 email = Column( 

388 "email", EmailAddressColType, 

389 comment="User's e-mail address" 

390 ) 

391 hashedpw = Column( 

392 "hashedpw", HashedPasswordColType, 

393 nullable=False, 

394 comment="Password hash" 

395 ) 

396 last_login_at_utc = Column( 

397 "last_login_at_utc", DateTime, 

398 comment="Date/time this user last logged in (UTC)" 

399 ) 

400 last_password_change_utc = Column( 

401 "last_password_change_utc", DateTime, 

402 comment="Date/time this user last changed their password (UTC)" 

403 ) 

404 superuser = Column( 

405 "superuser", Boolean, 

406 default=False, 

407 comment="Superuser?" 

408 ) 

409 must_change_password = Column( 

410 "must_change_password", Boolean, 

411 default=False, 

412 comment="Must change password at next webview login" 

413 ) 

414 when_agreed_terms_of_use = Column( 

415 "when_agreed_terms_of_use", PendulumDateTimeAsIsoTextColType, 

416 comment="Date/time this user acknowledged the Terms and " 

417 "Conditions of Use (ISO 8601)" 

418 ) 

419 upload_group_id = Column( 

420 "upload_group_id", Integer, ForeignKey("_security_groups.id"), 

421 comment="ID of the group to which this user uploads at present", 

422 # OK to be NULL in the database, but the user will not be able to 

423 # upload while it is. 

424 ) 

425 language = Column( 

426 "language", LanguageCodeColType, 

427 comment="Language code preferred by this user" 

428 ) 

429 auto_generated = Column( 

430 "auto_generated", Boolean, 

431 nullable=False, 

432 default=False, 

433 comment="Is automatically generated user with random password" 

434 ) 

435 single_patient_pk = Column( 

436 "single_patient_pk", Integer, ForeignKey("patient._pk", 

437 ondelete="SET NULL", 

438 use_alter=True), 

439 comment="For users locked to a single patient, the server PK of the " 

440 "server-created patient with which they are associated" 

441 ) 

442 

443 # ------------------------------------------------------------------------- 

444 # Relationships 

445 # ------------------------------------------------------------------------- 

446 

447 user_group_memberships = relationship( 

448 "UserGroupMembership", 

449 back_populates="user") # type: List[UserGroupMembership] 

450 groups = association_proxy( 

451 "user_group_memberships", "group") # type: List[Group] 

452 upload_group = relationship( 

453 "Group", foreign_keys=[upload_group_id]) # type: Optional[Group] 

454 single_patient = relationship( 

455 "Patient", foreign_keys=[single_patient_pk]) # type: Optional[Patient] 

456 

457 def __repr__(self) -> str: 

458 return simple_repr( 

459 self, 

460 ["id", "username", "fullname"], 

461 with_addr=True 

462 ) 

463 

464 @classmethod 

465 def get_user_by_id(cls, 

466 dbsession: SqlASession, 

467 user_id: Optional[int]) -> Optional['User']: 

468 """ 

469 Returns a User from their integer ID, or ``None``. 

470 """ 

471 if user_id is None: 

472 return None 

473 return dbsession.query(cls).filter(cls.id == user_id).first() 

474 

475 @classmethod 

476 def get_user_by_name(cls, 

477 dbsession: SqlASession, 

478 username: str) -> Optional['User']: 

479 """ 

480 Returns a User from their username, or ``None``. 

481 """ 

482 if not username: 

483 return None 

484 return dbsession.query(cls).filter(cls.username == username).first() 

485 

486 @classmethod 

487 def user_exists(cls, req: "CamcopsRequest", username: str) -> bool: 

488 """ 

489 Does a user exist with this username? 

490 """ 

491 if not username: 

492 return False 

493 dbsession = req.dbsession 

494 return exists_orm(dbsession, cls, cls.username == username) 

495 

496 @classmethod 

497 def create_superuser(cls, req: "CamcopsRequest", username: str, 

498 password: str) -> bool: 

499 """ 

500 Creates a superuser. 

501 

502 Will fail if the user already exists. 

503 

504 Args: 

505 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

506 username: the new superuser's username 

507 password: the new superuser's password 

508 

509 Returns: 

510 success? 

511 

512 """ 

513 assert username, "Can't create superuser with no name" 

514 assert username != USER_NAME_FOR_SYSTEM, ( 

515 f"Can't create user with name {USER_NAME_FOR_SYSTEM!r}") 

516 dbsession = req.dbsession 

517 user = cls.get_user_by_name(dbsession, username) 

518 if user: 

519 # already exists! 

520 return False 

521 # noinspection PyArgumentList 

522 user = cls(username=username) # does work! 

523 user.superuser = True 

524 audit(req, "SUPERUSER CREATED: " + user.username, from_console=True) 

525 user.set_password(req, password) # will audit 

526 user.language = req.language # a reasonable default 

527 dbsession.add(user) 

528 return True 

529 

530 @classmethod 

531 def get_username_from_id(cls, req: "CamcopsRequest", 

532 user_id: int) -> Optional[str]: 

533 """ 

534 Looks up a user from their integer ID and returns their name, if found. 

535 """ 

536 dbsession = req.dbsession 

537 return dbsession.query(cls.username)\ 

538 .filter(cls.id == user_id)\ 

539 .first()\ 

540 .scalar() 

541 

542 @classmethod 

543 def get_user_from_username_password( 

544 cls, 

545 req: "CamcopsRequest", 

546 username: str, 

547 password: str, 

548 take_time_for_nonexistent_user: bool = True) -> Optional['User']: 

549 """ 

550 Retrieve a User object from the supplied username, if the password is 

551 correct; otherwise, return None. 

552 

553 Args: 

554 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

555 username: the username 

556 password: the password attempt 

557 take_time_for_nonexistent_user: if ``True`` (the default), then 

558 even if the user doesn't exist, we take some time to mimic 

559 the time we spend doing deliberately wasteful password 

560 encryption (to prevent attackers from discovering real 

561 usernames via timing attacks). 

562 """ 

563 dbsession = req.dbsession 

564 user = cls.get_user_by_name(dbsession, username) 

565 if user is None: 

566 if take_time_for_nonexistent_user: 

567 # If the user really existed, we'd be running a somewhat 

568 # time-consuming bcrypt operation. So that attackers can't 

569 # identify fake users easily based on timing, we consume some 

570 # time: 

571 cls.take_some_time_mimicking_password_encryption() 

572 return None 

573 if not user.is_password_correct(password): 

574 return None 

575 return user 

576 

577 @classmethod 

578 def get_system_user(cls, dbsession: SqlASession) -> "User": 

579 """ 

580 Returns a user representing "command-line access". 

581 """ 

582 user = cls.get_user_by_name(dbsession, USER_NAME_FOR_SYSTEM) 

583 if not user: 

584 # noinspection PyArgumentList 

585 user = cls(username=USER_NAME_FOR_SYSTEM) # does work! 

586 dbsession.add(user) 

587 user.fullname = "CamCOPS system user" 

588 user.superuser = True 

589 user.hashedpw = '' # because it's not nullable 

590 # ... note that no password will hash to '', in addition to the fact 

591 # that the system will not allow logon attempts for this user! 

592 return user 

593 

594 @staticmethod 

595 def is_username_permissible(username: str) -> bool: 

596 """ 

597 Is this a permissible username? 

598 """ 

599 return bool(re.match(VALID_USERNAME_REGEX, username)) 

600 

601 @staticmethod 

602 def take_some_time_mimicking_password_encryption() -> None: 

603 """ 

604 Waste some time. We use this when an attempt has been made to log in 

605 with a nonexistent user; we know the user doesn't exist very quickly, 

606 but we mimic the time it takes to check a real user's password. 

607 """ 

608 rnc_crypto.hash_password("dummy!", BCRYPT_DEFAULT_LOG_ROUNDS) 

609 

610 def set_password(self, req: "CamcopsRequest", new_password: str) -> None: 

611 """ 

612 Set a user's password. 

613 """ 

614 self.hashedpw = rnc_crypto.hash_password(new_password, 

615 BCRYPT_DEFAULT_LOG_ROUNDS) 

616 self.last_password_change_utc = req.now_utc_no_tzinfo 

617 self.must_change_password = False 

618 audit(req, "Password changed for user " + self.username) 

619 

620 def is_password_correct(self, password: str) -> bool: 

621 """ 

622 Is the supplied password valid for this user? 

623 """ 

624 return rnc_crypto.is_password_valid(password, self.hashedpw) 

625 

626 def force_password_change(self) -> None: 

627 """ 

628 Make the user change their password at next login. 

629 """ 

630 self.must_change_password = True 

631 

632 def login(self, req: "CamcopsRequest") -> None: 

633 """ 

634 Called when the framework has determined a successful login. 

635 

636 Clears any login failures. 

637 Requires the user to change their password if policies say they should. 

638 """ 

639 self.clear_login_failures(req) 

640 self.set_password_change_flag_if_necessary(req) 

641 self.last_login_at_utc = req.now_utc_no_tzinfo 

642 

643 def set_password_change_flag_if_necessary(self, 

644 req: "CamcopsRequest") -> None: 

645 """ 

646 If we're requiring users to change their passwords, then check to 

647 see if they must do so now. 

648 """ 

649 if self.must_change_password: 

650 # already required, pointless to check again 

651 return 

652 cfg = req.config 

653 if cfg.password_change_frequency_days <= 0: 

654 # changes never required 

655 return 

656 if not self.last_password_change_utc: 

657 # we don't know when the last change was, so it's overdue 

658 self.force_password_change() 

659 return 

660 delta = req.now_utc_no_tzinfo - self.last_password_change_utc 

661 # Must use a version of "now" with no timezone info, since 

662 # self.last_password_change_utc is "offset-naive" (has no timezone 

663 # info) 

664 if delta.days >= cfg.password_change_frequency_days: 

665 self.force_password_change() 

666 

667 @property 

668 def must_agree_terms(self) -> bool: 

669 """ 

670 Does the user still need to agree the terms/conditions of use? 

671 """ 

672 if self.when_agreed_terms_of_use is None: 

673 # User hasn't agreed yet. 

674 return True 

675 if self.when_agreed_terms_of_use.date() < TERMS_CONDITIONS_UPDATE_DATE: 

676 # User hasn't agreed since the terms were updated. 

677 return True 

678 return False 

679 

680 def agree_terms(self, req: "CamcopsRequest") -> None: 

681 """ 

682 Mark the user as having agreed to the terms/conditions of use now. 

683 """ 

684 self.when_agreed_terms_of_use = req.now 

685 

686 def clear_login_failures(self, req: "CamcopsRequest") -> None: 

687 """ 

688 Clear login failures. 

689 """ 

690 if not self.username: 

691 return 

692 SecurityLoginFailure.clear_login_failures(req, self.username) 

693 

694 def is_locked_out(self, req: "CamcopsRequest") -> bool: 

695 """ 

696 Is the user locked out because of multiple login failures? 

697 """ 

698 return SecurityAccountLockout.is_user_locked_out(req, self.username) 

699 

700 def locked_out_until(self, 

701 req: "CamcopsRequest") -> Optional[Pendulum]: 

702 """ 

703 When is the user locked out until? 

704 

705 Returns a Pendulum datetime in local timezone (or ``None`` if the 

706 user isn't locked out). 

707 """ 

708 return SecurityAccountLockout.user_locked_out_until(req, 

709 self.username) 

710 

711 def enable(self, req: "CamcopsRequest") -> None: 

712 """ 

713 Re-enables the user, unlocking them and clearing login failures. 

714 """ 

715 SecurityLoginFailure.enable_user(req, self.username) 

716 

717 @property 

718 def may_login_as_tablet(self) -> bool: 

719 """ 

720 May the user login via the client (tablet) API? 

721 """ 

722 return self.may_upload or self.may_register_devices 

723 

724 @property 

725 def group_ids(self) -> List[int]: 

726 """ 

727 Return a list of group IDs for all the groups that the user is a member 

728 of. 

729 """ 

730 return sorted(list(g.id for g in self.groups)) 

731 

732 @property 

733 def group_names(self) -> List[str]: 

734 """ 

735 Returns a list of group names for all the groups that the user is a 

736 member of. 

737 """ 

738 return sorted(list(g.name for g in self.groups)) 

739 

740 def set_group_ids(self, group_ids: List[int]) -> None: 

741 """ 

742 Set the user's groups to the groups whose integer IDs are in the 

743 ``group_ids`` list, and remove the user from any other groups. 

744 """ 

745 dbsession = SqlASession.object_session(self) 

746 assert dbsession, ("User.set_group_ids() called on a User that's not " 

747 "yet in a session") 

748 # groups = Group.get_groups_from_id_list(dbsession, group_ids) 

749 

750 # Remove groups that no longer apply 

751 for m in self.user_group_memberships: 

752 if m.group_id not in group_ids: 

753 dbsession.delete(m) 

754 # Add new groups 

755 current_group_ids = [m.group_id for m in self.user_group_memberships] 

756 new_group_ids = [gid for gid in group_ids 

757 if gid not in current_group_ids] 

758 for gid in new_group_ids: 

759 self.user_group_memberships.append(UserGroupMembership( 

760 user_id=self.id, 

761 group_id=gid, 

762 )) 

763 

764 @property 

765 def ids_of_groups_user_may_see(self) -> List[int]: 

766 """ 

767 Return a list of group IDs for groups that the user may see data 

768 from. (That means the groups the user is in, plus any other groups that 

769 the user's groups are authorized to see.) 

770 """ 

771 # Incidentally: "list_a += list_b" vs "list_a.extend(list_b)": 

772 # https://stackoverflow.com/questions/3653298/concatenating-two-lists-difference-between-and-extend # noqa 

773 # ... not much difference; perhaps += is slightly better (also clearer) 

774 # And relevant set operations: 

775 # https://stackoverflow.com/questions/4045403/python-how-to-add-the-contents-of-an-iterable-to-a-set # noqa 

776 # 

777 # Process as a set rather than a list, to eliminate duplicates: 

778 group_ids = set() # type: Set[int] 

779 for my_group in self.groups: # type: Group 

780 group_ids.update(my_group.ids_of_groups_group_may_see()) 

781 return list(group_ids) 

782 # Return as a list rather than a set, because SQLAlchemy's in_() 

783 # operator only likes lists and ?tuples. 

784 

785 @property 

786 def ids_of_groups_user_may_dump(self) -> List[int]: 

787 """ 

788 Return a list of group IDs for groups that the user may dump data 

789 from. 

790 

791 See also :meth:`groups_user_may_dump`. 

792 

793 This does **not** give "second-hand authority" to dump. For example, 

794 if group G1 can "see" G2, and user U has authority to dump G1, that 

795 authority does not extend to G2. 

796 """ 

797 if self.superuser: 

798 return Group.all_group_ids( 

799 dbsession=SqlASession.object_session(self)) 

800 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

801 return [m.group_id for m in memberships if m.may_dump_data] 

802 

803 @property 

804 def ids_of_groups_user_may_report_on(self) -> List[int]: 

805 """ 

806 Returns a list of group IDs for groups that the user may run reports 

807 on. 

808 

809 This does **not** give "second-hand authority" to dump. For example, 

810 if group G1 can "see" G2, and user U has authority to report on G1, 

811 that authority does not extend to G2. 

812 """ 

813 if self.superuser: 

814 return Group.all_group_ids( 

815 dbsession=SqlASession.object_session(self)) 

816 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

817 return [m.group_id for m in memberships if m.may_run_reports] 

818 

819 @property 

820 def ids_of_groups_user_is_admin_for(self) -> List[int]: 

821 """ 

822 Returns a list of group IDs for groups that the user is an 

823 administrator for. 

824 """ 

825 if self.superuser: 

826 return Group.all_group_ids( 

827 dbsession=SqlASession.object_session(self)) 

828 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

829 return [m.group_id for m in memberships if m.groupadmin] 

830 

831 @property 

832 def names_of_groups_user_is_admin_for(self) -> List[str]: 

833 """ 

834 Returns a list of group names for groups that the user is an 

835 administrator for. 

836 """ 

837 if self.superuser: 

838 return Group.all_group_names( 

839 dbsession=SqlASession.object_session(self)) 

840 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

841 return [m.group.name for m in memberships if m.groupadmin] 

842 

843 @property 

844 def names_of_groups_user_is_admin_for_csv(self) -> str: 

845 """ 

846 Returns a list of group names for groups that the user is an 

847 administrator for. 

848 """ 

849 names = sorted(self.names_of_groups_user_is_admin_for) 

850 return ", ".join(names) 

851 

852 def may_administer_group(self, group_id: int) -> bool: 

853 """ 

854 May this user administer the group identified by ``group_id``? 

855 """ 

856 if self.superuser: 

857 return True 

858 return group_id in self.ids_of_groups_user_is_admin_for 

859 

860 @property 

861 def groups_user_may_see(self) -> List[Group]: 

862 """ 

863 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

864 objects for groups the user can see. 

865 

866 Less efficient than the group ID version; for visual display (see 

867 ``view_own_user_info.mako``). 

868 

869 """ 

870 groups = set(self.groups) # type: Set[Group] 

871 for my_group in self.groups: # type: Group 

872 groups.update(set(my_group.can_see_other_groups)) 

873 return sorted(list(groups), key=lambda g: g.name) 

874 

875 @property 

876 def groups_user_may_dump(self) -> List[Group]: 

877 """ 

878 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

879 objects for groups the user can dump. 

880 

881 For security notes, see :meth:`ids_of_groups_user_may_dump`. 

882 

883 Less efficient than the group ID version (see 

884 :meth:`ids_of_groups_user_may_dump`). This version is for visual 

885 display (see ``view_own_user_info.mako``). 

886 

887 """ 

888 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

889 return sorted([m.group for m in memberships if m.may_dump_data], 

890 key=lambda g: g.name) 

891 

892 @property 

893 def groups_user_may_report_on(self) -> List[Group]: 

894 """ 

895 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

896 objects for groups the user can report on. 

897 

898 For security notes, see :meth:`ids_of_groups_user_may_report_on`. 

899 

900 Less efficient than the group ID version (see 

901 :meth:`ids_of_groups_user_may_report_on`). This version is for visual 

902 display (see ``view_own_user_info.mako``). 

903 

904 """ 

905 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

906 return sorted([m.group for m in memberships if m.may_run_reports], 

907 key=lambda g: g.name) 

908 

909 @property 

910 def groups_user_may_upload_into(self) -> List[Group]: 

911 """ 

912 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

913 objects for groups the user can upload into. 

914 

915 For visual display (see ``view_own_user_info.mako``). 

916 

917 """ 

918 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

919 return sorted([m.group for m in memberships if m.may_upload], 

920 key=lambda g: g.name) 

921 

922 @property 

923 def groups_user_may_add_special_notes(self) -> List[Group]: 

924 """ 

925 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

926 objects for groups the user can add special notes to. 

927 

928 For visual display (see ``view_own_user_info.mako``). 

929 

930 """ 

931 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

932 return sorted([m.group for m in memberships if m.may_add_notes], 

933 key=lambda g: g.name) 

934 

935 @property 

936 def groups_user_may_see_all_pts_when_unfiltered(self) -> List[Group]: 

937 """ 

938 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

939 objects for groups the user can see all patients when unfiltered. 

940 

941 For visual display (see ``view_own_user_info.mako``). 

942 

943 """ 

944 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

945 return sorted([m.group for m in memberships 

946 if m.view_all_patients_when_unfiltered], 

947 key=lambda g: g.name) 

948 

949 @property 

950 def groups_user_is_admin_for(self) -> List[Group]: 

951 """ 

952 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

953 objects for groups the user is an administrator for. 

954 

955 Less efficient than the group ID version; for visual display (see 

956 ``view_own_user_info.mako``). 

957 

958 """ 

959 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

960 return sorted([m.group for m in memberships if m.groupadmin], 

961 key=lambda g: g.name) 

962 

963 @property 

964 def is_a_groupadmin(self) -> bool: 

965 """ 

966 Is the user a specifically defined group administrator (for any group)? 

967 """ 

968 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

969 return any(m.groupadmin for m in memberships) 

970 

971 @property 

972 def authorized_as_groupadmin(self) -> bool: 

973 """ 

974 Is the user authorized as a group administrator for any group (either 

975 by being specifically set as a group administrator, or by being a 

976 superuser)? 

977 """ 

978 return self.superuser or self.is_a_groupadmin 

979 

980 def membership_for_group_id(self, group_id: int) -> UserGroupMembership: 

981 """ 

982 Returns the :class:`UserGroupMembership` object relating this user 

983 to the group identified by ``group_id``. 

984 """ 

985 return next( 

986 (m for m in self.user_group_memberships if m.group_id == group_id), 

987 None 

988 ) 

989 

990 @property 

991 def may_use_webviewer(self) -> bool: 

992 """ 

993 May this user log in to the web front end? 

994 """ 

995 if self.superuser: 

996 return True 

997 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

998 return any(m.may_use_webviewer for m in memberships) 

999 

1000 def authorized_to_add_special_note(self, group_id: int) -> bool: 

1001 """ 

1002 Is this user authorized to add special notes for the group identified 

1003 by ``group_id``? 

1004 """ 

1005 if self.superuser: 

1006 return True 

1007 membership = self.membership_for_group_id(group_id) 

1008 if not membership: 

1009 return False 

1010 return membership.may_add_notes 

1011 

1012 def authorized_to_erase_tasks(self, group_id: int) -> bool: 

1013 """ 

1014 Is this user authorized to erase tasks for the group identified 

1015 by ``group_id``? 

1016 """ 

1017 if self.superuser: 

1018 return True 

1019 membership = self.membership_for_group_id(group_id) 

1020 if not membership: 

1021 return False 

1022 return membership.groupadmin 

1023 

1024 @property 

1025 def authorized_to_dump(self) -> bool: 

1026 """ 

1027 Is the user authorized to dump data (for some group)? 

1028 """ 

1029 if self.superuser: 

1030 return True 

1031 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

1032 return any(m.may_dump_data for m in memberships) 

1033 

1034 @property 

1035 def authorized_for_reports(self) -> bool: 

1036 """ 

1037 Is the user authorized to run reports (for some group)? 

1038 """ 

1039 if self.superuser: 

1040 return True 

1041 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

1042 return any(m.may_run_reports for m in memberships) 

1043 

1044 @property 

1045 def may_view_all_patients_when_unfiltered(self) -> bool: 

1046 """ 

1047 May the user view all patients when no filters are applied (for all 

1048 groups that the user is a member of)? 

1049 """ 

1050 if self.superuser: 

1051 return True 

1052 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

1053 return all(m.view_all_patients_when_unfiltered for m in memberships) 

1054 

1055 @property 

1056 def may_view_no_patients_when_unfiltered(self) -> bool: 

1057 """ 

1058 May the user view *no* patients when no filters are applied? 

1059 """ 

1060 if self.superuser: 

1061 return False 

1062 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

1063 return all(not m.view_all_patients_when_unfiltered 

1064 for m in memberships) 

1065 

1066 def group_ids_that_nonsuperuser_may_see_when_unfiltered(self) -> List[int]: 

1067 """ 

1068 Which group IDs may this user see all patients for, when unfiltered? 

1069 """ 

1070 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

1071 return [m.group_id for m in memberships 

1072 if m.view_all_patients_when_unfiltered] 

1073 

1074 def may_upload_to_group(self, group_id: int) -> bool: 

1075 """ 

1076 May this user upload to the specified group? 

1077 """ 

1078 if self.superuser: 

1079 return True 

1080 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

1081 return any(m.may_upload for m in memberships if m.group_id == group_id) 

1082 

1083 @property 

1084 def may_upload(self) -> bool: 

1085 """ 

1086 May this user upload to the group that is set as their upload group? 

1087 """ 

1088 if self.upload_group_id is None: 

1089 return False 

1090 return self.may_upload_to_group(self.upload_group_id) 

1091 

1092 @property 

1093 def may_register_devices(self) -> bool: 

1094 """ 

1095 May this user register devices? 

1096 

1097 You can register a device if your chosen upload groups allow you to do 

1098 so. (You have to have a chosen group -- even for superusers -- because 

1099 the tablet wants group ID policies at the moment of registration, so we 

1100 have to know which group.) 

1101 """ 

1102 if self.upload_group_id is None: 

1103 return False 

1104 if self.superuser: 

1105 return True 

1106 memberships = self.user_group_memberships # type: List[UserGroupMembership] # noqa 

1107 return any(m.may_register_devices for m in memberships 

1108 if m.group_id == self.upload_group_id) 

1109 

1110 def managed_users(self) -> Optional[Query]: 

1111 """ 

1112 Return a query for all users managed by this user. 

1113 

1114 LOGIC SHOULD MATCH :meth:`may_edit_user`. 

1115 """ 

1116 dbsession = SqlASession.object_session(self) 

1117 if not self.superuser and not self.is_a_groupadmin: 

1118 return dbsession.query(User).filter(false()) 

1119 # https://stackoverflow.com/questions/10345327/sqlalchemy-create-an-intentionally-empty-query # noqa 

1120 q = ( 

1121 dbsession.query(User) 

1122 .filter(User.username != USER_NAME_FOR_SYSTEM) 

1123 .order_by(User.username) 

1124 ) 

1125 if not self.superuser: 

1126 # LOGIC SHOULD MATCH assert_may_edit_user 

1127 # Restrict to users who are members of groups that I am an admin 

1128 # for: 

1129 groupadmin_group_ids = self.ids_of_groups_user_is_admin_for 

1130 # noinspection PyUnresolvedReferences 

1131 ugm2 = UserGroupMembership.__table__.alias("ugm2") 

1132 q = q.join(User.user_group_memberships)\ 

1133 .filter(not_(User.superuser))\ 

1134 .filter(UserGroupMembership.group_id.in_(groupadmin_group_ids))\ 

1135 .filter( 

1136 ~exists().select_from(ugm2).where( 

1137 and_( 

1138 ugm2.c.user_id == User.id, 

1139 ugm2.c.groupadmin 

1140 ) 

1141 ) 

1142 ) 

1143 # ... no superusers 

1144 # ... user must be a member of one of our groups 

1145 # ... no groupadmins 

1146 # https://stackoverflow.com/questions/14600619/using-not-exists-clause-in-sqlalchemy-orm-query # noqa 

1147 return q 

1148 

1149 def may_edit_user(self, req: "CamcopsRequest", 

1150 other: "User") -> Tuple[bool, str]: 

1151 """ 

1152 May the ``self`` user edit the ``other`` user? 

1153 

1154 Args: 

1155 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1156 other: the user to be edited (potentially) 

1157 

1158 Returns: 

1159 tuple: may_edit (bool), reason_why_not (str) 

1160 

1161 LOGIC SHOULD MATCH :meth:`managed_users`. 

1162 """ 

1163 _ = req.gettext 

1164 if other.username == USER_NAME_FOR_SYSTEM: 

1165 return False, _("Nobody may edit the system user") 

1166 if not self.superuser: 

1167 if other.superuser: 

1168 return False, _("You may not edit a superuser") 

1169 if other.is_a_groupadmin: 

1170 return False, _("You may not edit a group administrator") 

1171 groupadmin_group_ids = self.ids_of_groups_user_is_admin_for 

1172 if not any(gid in groupadmin_group_ids for gid in other.group_ids): 

1173 return False, _("You are not a group administrator for any " 

1174 "groups that this user is in") 

1175 return True, "" 

1176 

1177 

1178def set_password_directly(req: "CamcopsRequest", 

1179 username: str, password: str) -> bool: 

1180 """ 

1181 If the user exists, set its password. Returns Boolean success. 

1182 Used from the command line. 

1183 """ 

1184 dbsession = req.dbsession 

1185 user = User.get_user_by_name(dbsession, username) 

1186 if not user: 

1187 return False 

1188 user.set_password(req, password) 

1189 user.enable(req) 

1190 audit(req, "Password changed for user " + user.username, from_console=True) 

1191 return True