Coverage for cc_modules/cc_user.py: 45%

526 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:51 +0100

1""" 

2camcops_server/cc_modules/cc_user.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**CamCOPS users.** 

27 

28""" 

29 

30import datetime 

31import logging 

32import re 

33from typing import Any, List, Optional, Set, Tuple, TYPE_CHECKING 

34 

35import cardinal_pythonlib.crypto as rnc_crypto 

36from cardinal_pythonlib.datetimefunc import convert_datetime_to_local 

37from cardinal_pythonlib.logs import BraceStyleAdapter 

38from cardinal_pythonlib.reprfunc import simple_repr 

39from cardinal_pythonlib.sqlalchemy.orm_query import ( 

40 CountStarSpecializedQuery, 

41 exists_orm, 

42) 

43from pendulum import DateTime as Pendulum 

44import phonenumbers 

45import pyotp 

46from sqlalchemy import text 

47from sqlalchemy.ext.associationproxy import association_proxy 

48from sqlalchemy.orm import ( 

49 Mapped, 

50 mapped_column, 

51 relationship, 

52 Session as SqlASession, 

53 Query, 

54) 

55from sqlalchemy.sql import false 

56from sqlalchemy.sql.expression import and_, exists, not_ 

57from sqlalchemy.sql.functions import func 

58from sqlalchemy.sql.schema import ForeignKey 

59 

60from camcops_server.cc_modules.cc_audit import audit 

61from camcops_server.cc_modules.cc_constants import ( 

62 MfaMethod, 

63 OBSCURE_EMAIL_ASTERISKS, 

64 OBSCURE_PHONE_ASTERISKS, 

65 USER_NAME_FOR_SYSTEM, 

66) 

67from camcops_server.cc_modules.cc_group import Group 

68from camcops_server.cc_modules.cc_membership import UserGroupMembership 

69from camcops_server.cc_modules.cc_sqla_coltypes import ( 

70 Base32ColType, 

71 EmailAddressColType, 

72 FullNameColType, 

73 HashedPasswordColType, 

74 LanguageCodeColType, 

75 MfaMethodColType, 

76 PendulumDateTimeAsIsoTextColType, 

77 PhoneNumberColType, 

78 UserNameCamcopsColType, 

79) 

80from camcops_server.cc_modules.cc_sqlalchemy import Base 

81from camcops_server.cc_modules.cc_text import TERMS_CONDITIONS_UPDATE_DATE 

82 

83if TYPE_CHECKING: 

84 from camcops_server.cc_modules.cc_patient import Patient 

85 from camcops_server.cc_modules.cc_request import CamcopsRequest 

86 

87log = BraceStyleAdapter(logging.getLogger(__name__)) 

88 

89 

90# ============================================================================= 

91# Constants 

92# ============================================================================= 

93 

94_TYPE_LUGM = List[UserGroupMembership] 

95 

96VALID_USERNAME_REGEX = "^[A-Za-z0-9_-]+$" 

97BCRYPT_DEFAULT_LOG_ROUNDS = 6 

98# Default is 12, but it does impact on the tablet upload speed (cost per 

99# transaction). Time is expected to be proportional to 2^n, i.e. incrementing 1 

100# increases time by a factor of 2. 

101# Empirically, on egret: 

102# 2^12 rounds takes around 400 ms 

103# 2^8 rounds takes around 30 ms (as expected, 1/16 of the time as for 12) 

104# we'd like around 8 ms; http://security.stackexchange.com/questions/17207 

105# ... so we should be using 12 + log(8/400)/log(2) = 6 rounds 

106 

107CLEAR_DUMMY_LOGIN_FREQUENCY_DAYS = 7 

108CLEAR_DUMMY_LOGIN_PERIOD = datetime.timedelta( 

109 days=CLEAR_DUMMY_LOGIN_FREQUENCY_DAYS 

110) 

111 

112 

113# ============================================================================= 

114# SecurityAccountLockout 

115# ============================================================================= 

116# Note that we record login failures for non-existent users, and pretend 

117# they're locked out (to prevent username discovery that way, by timing) 

118 

119 

120class SecurityAccountLockout(Base): 

121 """ 

122 Represents an account "lockout". 

123 """ 

124 

125 __tablename__ = "_security_account_lockouts" 

126 

127 id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) 

128 username: Mapped[str] = mapped_column( 

129 "username", 

130 UserNameCamcopsColType, 

131 index=True, 

132 comment="User name (which may be a non-existent user, to prevent " 

133 "subtle username discovery by careful timing)", 

134 ) 

135 locked_until: Mapped[datetime.datetime] = mapped_column( 

136 index=True, 

137 comment="Account is locked until (UTC)", 

138 ) 

139 

140 @classmethod 

141 def delete_old_account_lockouts(cls, req: "CamcopsRequest") -> None: 

142 """ 

143 Delete all expired account lockouts. 

144 """ 

145 dbsession = req.dbsession 

146 now = req.now_utc 

147 dbsession.query(cls).filter(cls.locked_until <= now).delete( 

148 synchronize_session=False 

149 ) 

150 

151 @classmethod 

152 def is_user_locked_out(cls, req: "CamcopsRequest", username: str) -> bool: 

153 """ 

154 Is the specified user locked out? 

155 

156 Args: 

157 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

158 username: the user's username 

159 """ 

160 dbsession = req.dbsession 

161 now = req.now_utc 

162 return exists_orm( 

163 dbsession, cls, cls.username == username, cls.locked_until > now # type: ignore[arg-type] # noqa: E501 

164 ) 

165 

166 @classmethod 

167 def user_locked_out_until( 

168 cls, req: "CamcopsRequest", username: str 

169 ) -> Optional[Pendulum]: 

170 """ 

171 When is the user locked out until? 

172 

173 Args: 

174 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

175 username: the user's username 

176 

177 Returns: 

178 Pendulum datetime in local timezone (or ``None`` if not 

179 locked out). 

180 """ 

181 dbsession = req.dbsession 

182 now = req.now_utc 

183 locked_until_utc = ( 

184 dbsession.query(func.max(cls.locked_until)) 

185 .filter(cls.username == username) 

186 .filter(cls.locked_until > now) 

187 .scalar() 

188 ) # type: Optional[Pendulum] 

189 # ... NOT first(), which returns (result,); we want just result 

190 if not locked_until_utc: 

191 return None 

192 return convert_datetime_to_local(locked_until_utc) 

193 

194 @classmethod 

195 def lock_user_out( 

196 cls, req: "CamcopsRequest", username: str, lockout_minutes: int 

197 ) -> None: 

198 """ 

199 Lock user out for a specified number of minutes. 

200 

201 Args: 

202 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

203 username: the user's username 

204 lockout_minutes: number of minutes 

205 """ 

206 dbsession = req.dbsession 

207 now = req.now_utc 

208 lock_until = now + datetime.timedelta(minutes=lockout_minutes) 

209 # noinspection PyArgumentList 

210 lock = cls(username=username, locked_until=lock_until) 

211 dbsession.add(lock) 

212 audit( 

213 req, f"Account {username} locked out for {lockout_minutes} minutes" 

214 ) 

215 

216 @classmethod 

217 def unlock_user(cls, req: "CamcopsRequest", username: str) -> None: 

218 """ 

219 Unlock a user. 

220 

221 Args: 

222 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

223 username: the user's username 

224 """ 

225 dbsession = req.dbsession 

226 dbsession.query(cls).filter(cls.username == username).delete( 

227 synchronize_session=False 

228 ) 

229 

230 

231# ============================================================================= 

232# SecurityLoginFailure 

233# ============================================================================= 

234 

235 

236class SecurityLoginFailure(Base): 

237 """ 

238 Represents a record of a failed login. 

239 

240 Too many failed logins lead to a lockout; see 

241 :class:`SecurityAccountLockout`. 

242 """ 

243 

244 __tablename__ = "_security_login_failures" 

245 

246 id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) 

247 username: Mapped[str] = mapped_column( 

248 UserNameCamcopsColType, 

249 index=True, 

250 comment="User name (which may be a non-existent user, to prevent " 

251 "subtle username discovery by careful timing)", 

252 ) 

253 login_failure_at: Mapped[datetime.datetime] = mapped_column( 

254 index=True, 

255 comment="Login failure occurred at (UTC)", 

256 ) 

257 

258 @classmethod 

259 def record_login_failure( 

260 cls, req: "CamcopsRequest", username: str 

261 ) -> None: 

262 """ 

263 Record that a user has failed to log in. 

264 

265 Args: 

266 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

267 username: the user's username 

268 """ 

269 dbsession = req.dbsession 

270 now = req.now_utc 

271 # noinspection PyArgumentList 

272 failure = cls(username=username, login_failure_at=now) 

273 dbsession.add(failure) 

274 

275 @classmethod 

276 def act_on_login_failure( 

277 cls, req: "CamcopsRequest", username: str 

278 ) -> None: 

279 """ 

280 Record login failure and lock out user if necessary. 

281 

282 Args: 

283 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

284 username: the user's username 

285 """ 

286 cfg = req.config 

287 audit(req, f"Failed login as user: {username}") 

288 cls.record_login_failure(req, username) 

289 nfailures = cls.how_many_login_failures(req, username) 

290 nlockouts = nfailures // cfg.lockout_threshold 

291 nfailures_since_last_lockout = nfailures % cfg.lockout_threshold 

292 if nlockouts >= 1 and nfailures_since_last_lockout == 0: 

293 # new lockout required 

294 lockout_minutes = ( 

295 nlockouts * cfg.lockout_duration_increment_minutes 

296 ) 

297 SecurityAccountLockout.lock_user_out( 

298 req, username, lockout_minutes 

299 ) 

300 

301 @classmethod 

302 def clear_login_failures( 

303 cls, req: "CamcopsRequest", username: str 

304 ) -> None: 

305 """ 

306 Clear login failures for a user. 

307 

308 Args: 

309 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

310 username: the user's username 

311 """ 

312 dbsession = req.dbsession 

313 dbsession.query(cls).filter(cls.username == username).delete( 

314 synchronize_session=False 

315 ) 

316 

317 @classmethod 

318 def how_many_login_failures( 

319 cls, req: "CamcopsRequest", username: str 

320 ) -> int: 

321 """ 

322 How many times has the user tried and failed to log in (recently)? 

323 

324 Args: 

325 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

326 username: the user's username 

327 """ 

328 dbsession = req.dbsession 

329 q = CountStarSpecializedQuery(cls, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501 

330 cls.username == username 

331 ) 

332 return q.count_star() 

333 

334 @classmethod 

335 def enable_user(cls, req: "CamcopsRequest", username: str) -> None: 

336 """ 

337 Unlock user and clear login failures. 

338 

339 Args: 

340 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

341 username: the user's username 

342 """ 

343 SecurityAccountLockout.unlock_user(req, username) 

344 cls.clear_login_failures(req, username) 

345 audit(req, f"User {username} re-enabled") 

346 

347 @classmethod 

348 def clear_login_failures_for_nonexistent_users( 

349 cls, req: "CamcopsRequest" 

350 ) -> None: 

351 """ 

352 Clear login failures for nonexistent users. 

353 

354 Login failues are recorded for nonexistent users to mimic the lockout 

355 seen for real users, i.e. to reduce the potential for username 

356 discovery. 

357 

358 Args: 

359 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

360 """ 

361 dbsession = req.dbsession 

362 all_user_names = dbsession.query(User.username) 

363 dbsession.query(cls).filter( 

364 cls.username.notin_(all_user_names) 

365 ).delete(synchronize_session=False) 

366 # https://stackoverflow.com/questions/26182027/how-to-use-not-in-clause-in-sqlalchemy-orm-query # noqa 

367 

368 @classmethod 

369 def clear_dummy_login_failures_if_necessary( 

370 cls, req: "CamcopsRequest" 

371 ) -> None: 

372 """ 

373 Clear dummy login failures if we haven't done so for a while. 

374 

375 Not too often! See :data:`CLEAR_DUMMY_LOGIN_PERIOD`. 

376 

377 Args: 

378 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

379 """ 

380 now = req.now_utc 

381 ss = req.server_settings 

382 last_dummy_login_failure_clearance = ( 

383 ss.get_last_dummy_login_failure_clearance_pendulum() 

384 ) 

385 if last_dummy_login_failure_clearance is not None: 

386 elapsed = now - last_dummy_login_failure_clearance 

387 if elapsed < CLEAR_DUMMY_LOGIN_PERIOD: 

388 # We cleared it recently. 

389 return 

390 

391 cls.clear_login_failures_for_nonexistent_users(req) 

392 log.debug("Dummy login failures cleared.") 

393 ss.last_dummy_login_failure_clearance_at_utc = now 

394 

395 

396# ============================================================================= 

397# User class 

398# ============================================================================= 

399 

400 

401class User(Base): 

402 """ 

403 Class representing a user. 

404 """ 

405 

406 __tablename__ = "_security_users" 

407 

408 # ------------------------------------------------------------------------- 

409 # Columns 

410 # ------------------------------------------------------------------------- 

411 

412 id: Mapped[int] = mapped_column( 

413 primary_key=True, 

414 autoincrement=True, 

415 index=True, 

416 comment="User ID", 

417 ) 

418 username: Mapped[str] = mapped_column( 

419 UserNameCamcopsColType, 

420 index=True, 

421 unique=True, 

422 comment="User name", 

423 ) 

424 fullname: Mapped[Optional[str]] = mapped_column( 

425 FullNameColType, comment="User's full name" 

426 ) 

427 email: Mapped[Optional[str]] = mapped_column( 

428 EmailAddressColType, comment="User's e-mail address" 

429 ) 

430 phone_number: Mapped[Optional[phonenumbers.PhoneNumber]] = mapped_column( 

431 PhoneNumberColType, comment="User's phone number" 

432 ) 

433 hashedpw: Mapped[str] = mapped_column( 

434 HashedPasswordColType, 

435 comment="Password hash", 

436 ) 

437 mfa_secret_key: Mapped[Optional[str]] = mapped_column( 

438 Base32ColType, 

439 comment="Secret key used for multi-factor authentication", 

440 ) 

441 mfa_method: Mapped[str] = mapped_column( 

442 MfaMethodColType, 

443 server_default=MfaMethod.NO_MFA, 

444 comment="Preferred method of multi-factor authentication", 

445 ) 

446 hotp_counter: Mapped[int] = mapped_column( 

447 server_default=text("0"), 

448 comment="Counter used for HOTP authentication", 

449 ) 

450 last_login_at_utc: Mapped[Optional[datetime.datetime]] = mapped_column( 

451 comment="Date/time this user last logged in (UTC)", 

452 ) 

453 last_password_change_utc: Mapped[Optional[datetime.datetime]] = ( 

454 mapped_column( 

455 comment="Date/time this user last changed their password (UTC)", 

456 ) 

457 ) 

458 superuser: Mapped[Optional[bool]] = mapped_column( 

459 default=False, comment="Superuser?" 

460 ) 

461 must_change_password: Mapped[Optional[bool]] = mapped_column( 

462 default=False, 

463 comment="Must change password at next webview login", 

464 ) 

465 when_agreed_terms_of_use: Mapped[Optional[Pendulum]] = mapped_column( 

466 PendulumDateTimeAsIsoTextColType, 

467 comment="Date/time this user acknowledged the Terms and " 

468 "Conditions of Use (ISO 8601)", 

469 ) 

470 upload_group_id: Mapped[Optional[int]] = mapped_column( 

471 ForeignKey("_security_groups.id"), 

472 comment="ID of the group to which this user uploads at present", 

473 # OK to be NULL in the database, but the user will not be able to 

474 # upload while it is. 

475 ) 

476 language: Mapped[Optional[str]] = mapped_column( 

477 LanguageCodeColType, 

478 comment="Language code preferred by this user", 

479 ) 

480 auto_generated: Mapped[bool] = mapped_column( 

481 default=False, 

482 comment="Is automatically generated user with random password", 

483 ) 

484 single_patient_pk: Mapped[Optional[int]] = mapped_column( 

485 ForeignKey("patient._pk", ondelete="SET NULL", use_alter=True), 

486 comment="For users locked to a single patient, the server PK of the " 

487 "server-created patient with which they are associated", 

488 ) 

489 

490 # ------------------------------------------------------------------------- 

491 # Relationships 

492 # ------------------------------------------------------------------------- 

493 

494 user_group_memberships: Mapped[list[UserGroupMembership]] = relationship( 

495 "UserGroupMembership", back_populates="user" 

496 ) 

497 groups = association_proxy("user_group_memberships", "group") 

498 upload_group: Mapped[Optional[Group]] = relationship( 

499 "Group", foreign_keys=[upload_group_id] 

500 ) 

501 single_patient: Mapped[Optional["Patient"]] = relationship( 

502 "Patient", foreign_keys=[single_patient_pk] 

503 ) 

504 

505 # ------------------------------------------------------------------------- 

506 # __init__ 

507 # ------------------------------------------------------------------------- 

508 

509 def __init__(self, **kwargs: Any) -> None: 

510 super().__init__(**kwargs) 

511 # Prevent Python None from being converted to database string 'none'. 

512 self.mfa_method = kwargs.get("mfa_method", MfaMethod.NO_MFA) 

513 

514 # ------------------------------------------------------------------------- 

515 # String representations 

516 # ------------------------------------------------------------------------- 

517 

518 def __repr__(self) -> str: 

519 return simple_repr( 

520 self, ["id", "username", "fullname"], with_addr=True 

521 ) 

522 

523 # ------------------------------------------------------------------------- 

524 # Lookup methods 

525 # ------------------------------------------------------------------------- 

526 

527 @classmethod 

528 def get_user_by_id( 

529 cls, dbsession: SqlASession, user_id: Optional[int] 

530 ) -> Optional["User"]: 

531 """ 

532 Returns a User from their integer ID, or ``None``. 

533 """ 

534 if user_id is None: 

535 return None 

536 return dbsession.query(cls).filter(cls.id == user_id).first() 

537 

538 @classmethod 

539 def get_user_by_name( 

540 cls, dbsession: SqlASession, username: str 

541 ) -> Optional["User"]: 

542 """ 

543 Returns a User from their username, or ``None``. 

544 """ 

545 if not username: 

546 return None 

547 return dbsession.query(cls).filter(cls.username == username).first() 

548 

549 @classmethod 

550 def user_exists(cls, req: "CamcopsRequest", username: str) -> bool: 

551 """ 

552 Does a user exist with this username? 

553 """ 

554 if not username: 

555 return False 

556 dbsession = req.dbsession 

557 return exists_orm(dbsession, cls, cls.username == username) # type: ignore[arg-type] # noqa: E501 

558 

559 @classmethod 

560 def create_superuser( 

561 cls, req: "CamcopsRequest", username: str, password: str 

562 ) -> bool: 

563 """ 

564 Creates a superuser. 

565 

566 Will fail if the user already exists. 

567 

568 Args: 

569 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

570 username: the new superuser's username 

571 password: the new superuser's password 

572 

573 Returns: 

574 success? 

575 

576 """ 

577 assert username, "Can't create superuser with no name" 

578 assert ( 

579 username != USER_NAME_FOR_SYSTEM 

580 ), f"Can't create user with name {USER_NAME_FOR_SYSTEM!r}" 

581 dbsession = req.dbsession 

582 user = cls.get_user_by_name(dbsession, username) 

583 if user: 

584 # already exists! 

585 return False 

586 # noinspection PyArgumentList 

587 user = cls(username=username) # does work! 

588 user.superuser = True 

589 audit(req, "SUPERUSER CREATED: " + user.username, from_console=True) 

590 user.set_password(req, password) # will audit 

591 user.language = req.language # a reasonable default 

592 dbsession.add(user) 

593 return True 

594 

595 @classmethod 

596 def get_username_from_id( 

597 cls, req: "CamcopsRequest", user_id: int 

598 ) -> Optional[str]: 

599 """ 

600 Looks up a user from their integer ID and returns their name, if found. 

601 """ 

602 dbsession = req.dbsession 

603 return ( 

604 dbsession.query(cls.username) 

605 .filter(cls.id == user_id) 

606 .first() 

607 .scalar() 

608 ) 

609 

610 @classmethod 

611 def get_user_from_username_password( 

612 cls, 

613 req: "CamcopsRequest", 

614 username: str, 

615 password: str, 

616 take_time_for_nonexistent_user: bool = True, 

617 ) -> Optional["User"]: 

618 """ 

619 Retrieve a User object from the supplied username, if the password is 

620 correct; otherwise, return None. 

621 

622 Args: 

623 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

624 username: the username 

625 password: the password attempt 

626 take_time_for_nonexistent_user: if ``True`` (the default), then 

627 even if the user doesn't exist, we take some time to mimic 

628 the time we spend doing deliberately wasteful password 

629 encryption (to prevent attackers from discovering real 

630 usernames via timing attacks). 

631 """ 

632 dbsession = req.dbsession 

633 user = cls.get_user_by_name(dbsession, username) 

634 if user is None: 

635 if take_time_for_nonexistent_user: 

636 # If the user really existed, we'd be running a somewhat 

637 # time-consuming bcrypt operation. So that attackers can't 

638 # identify fake users easily based on timing, we consume some 

639 # time: 

640 cls.take_some_time_mimicking_password_encryption() 

641 return None 

642 if not user.is_password_correct(password): 

643 return None 

644 return user 

645 

646 @classmethod 

647 def get_system_user(cls, dbsession: SqlASession) -> "User": 

648 """ 

649 Returns a user representing "command-line access". 

650 """ 

651 user = cls.get_user_by_name(dbsession, USER_NAME_FOR_SYSTEM) 

652 if not user: 

653 # noinspection PyArgumentList 

654 user = cls(username=USER_NAME_FOR_SYSTEM) # does work! 

655 dbsession.add(user) 

656 user.fullname = "CamCOPS system user" 

657 user.superuser = True 

658 user.hashedpw = "" # because it's not nullable 

659 # ... note that no password will hash to '', in addition to the fact 

660 # that the system will not allow logon attempts for this user! 

661 return user 

662 

663 # ------------------------------------------------------------------------- 

664 # Static methods 

665 # ------------------------------------------------------------------------- 

666 

667 @staticmethod 

668 def is_username_permissible(username: str) -> bool: 

669 """ 

670 Is this a permissible username? 

671 """ 

672 return bool(re.match(VALID_USERNAME_REGEX, username)) 

673 

674 @staticmethod 

675 def take_some_time_mimicking_password_encryption() -> None: 

676 """ 

677 Waste some time. We use this when an attempt has been made to log in 

678 with a nonexistent user; we know the user doesn't exist very quickly, 

679 but we mimic the time it takes to check a real user's password. 

680 """ 

681 rnc_crypto.hash_password("dummy!", BCRYPT_DEFAULT_LOG_ROUNDS) 

682 

683 # ------------------------------------------------------------------------- 

684 # Authentication: passwords 

685 # ------------------------------------------------------------------------- 

686 

687 def set_password(self, req: "CamcopsRequest", new_password: str) -> None: 

688 """ 

689 Set a user's password. 

690 """ 

691 self.hashedpw = rnc_crypto.hash_password( 

692 new_password, BCRYPT_DEFAULT_LOG_ROUNDS 

693 ) 

694 self.last_password_change_utc = req.now_utc_no_tzinfo 

695 self.must_change_password = False 

696 audit(req, "Password changed for user " + self.username) 

697 

698 def is_password_correct(self, password: str) -> bool: 

699 """ 

700 Is the supplied password valid for this user? 

701 """ 

702 return rnc_crypto.is_password_valid(password, self.hashedpw) 

703 

704 def force_password_change(self) -> None: 

705 """ 

706 Make the user change their password at next login. 

707 """ 

708 self.must_change_password = True 

709 

710 def set_password_change_flag_if_necessary( 

711 self, req: "CamcopsRequest" 

712 ) -> None: 

713 """ 

714 If we're requiring users to change their passwords, then check to 

715 see if they must do so now. 

716 """ 

717 if self.must_change_password: 

718 # already required, pointless to check again 

719 return 

720 cfg = req.config 

721 if cfg.password_change_frequency_days <= 0: 

722 # changes never required 

723 return 

724 if not self.last_password_change_utc: 

725 # we don't know when the last change was, so it's overdue 

726 self.force_password_change() 

727 return 

728 delta = req.now_utc_no_tzinfo - self.last_password_change_utc 

729 # Must use a version of "now" with no timezone info, since 

730 # self.last_password_change_utc is "offset-naive" (has no timezone 

731 # info) 

732 if delta.days >= cfg.password_change_frequency_days: 

733 self.force_password_change() 

734 

735 # ------------------------------------------------------------------------- 

736 # Authentication: multi-factor authentication 

737 # ------------------------------------------------------------------------- 

738 

739 def set_mfa_method(self, mfa_method: str) -> None: 

740 """ 

741 Resets the multi-factor authentication (MFA) method. 

742 """ 

743 assert MfaMethod.valid( 

744 mfa_method 

745 ), f"Invalid MFA method: {mfa_method!r}" 

746 

747 # Set the method 

748 self.mfa_method = mfa_method 

749 

750 # A new secret key 

751 self.mfa_secret_key = pyotp.random_base32() 

752 

753 # Reset the HOTP counter 

754 self.hotp_counter = 0 

755 

756 def ensure_mfa_info(self) -> None: 

757 """ 

758 If for some reason we have lost aspects of our MFA information, 

759 reset it. This step also ensures that anything erroneous in the 

760 database is cleaned to a valid value. 

761 """ 

762 if not self.mfa_secret_key or self.hotp_counter is None: 

763 self.set_mfa_method(MfaMethod.clean(self.mfa_method)) 

764 

765 def verify_one_time_password(self, one_time_password: str) -> bool: 

766 """ 

767 Determines whether the supplied one-time password is valid for the 

768 multi-factor authentication (MFA) currently selected. 

769 

770 Returns ``False`` if no MFA method is selected. 

771 """ 

772 mfa_method = self.mfa_method 

773 

774 if not MfaMethod.requires_second_step(mfa_method): 

775 return False 

776 

777 if mfa_method == MfaMethod.TOTP: 

778 totp = pyotp.TOTP(self.mfa_secret_key) 

779 return totp.verify(one_time_password) 

780 

781 elif mfa_method in (MfaMethod.HOTP_EMAIL, MfaMethod.HOTP_SMS): 

782 hotp = pyotp.HOTP(self.mfa_secret_key) 

783 return one_time_password == hotp.at(self.hotp_counter) 

784 

785 else: 

786 raise ValueError( 

787 f"User.verify_one_time_password(): " 

788 f"Bad mfa_method = {mfa_method}" 

789 ) 

790 

791 # ------------------------------------------------------------------------- 

792 # Authentication: logging in 

793 # ------------------------------------------------------------------------- 

794 

795 def login(self, req: "CamcopsRequest") -> None: 

796 """ 

797 Called when the framework has determined a successful login. 

798 

799 Clears any login failures. 

800 Requires the user to change their password if policies say they should. 

801 """ 

802 self.clear_login_failures(req) 

803 self.set_password_change_flag_if_necessary(req) 

804 self.last_login_at_utc = req.now_utc_no_tzinfo 

805 

806 def clear_login_failures(self, req: "CamcopsRequest") -> None: 

807 """ 

808 Clear login failures. 

809 """ 

810 if not self.username: 

811 return 

812 SecurityLoginFailure.clear_login_failures(req, self.username) 

813 

814 def is_locked_out(self, req: "CamcopsRequest") -> bool: 

815 """ 

816 Is the user locked out because of multiple login failures? 

817 """ 

818 return SecurityAccountLockout.is_user_locked_out(req, self.username) 

819 

820 def locked_out_until(self, req: "CamcopsRequest") -> Optional[Pendulum]: 

821 """ 

822 When is the user locked out until? 

823 

824 Returns a Pendulum datetime in local timezone (or ``None`` if the 

825 user isn't locked out). 

826 """ 

827 return SecurityAccountLockout.user_locked_out_until(req, self.username) 

828 

829 def enable(self, req: "CamcopsRequest") -> None: 

830 """ 

831 Re-enables the user, unlocking them and clearing login failures. 

832 """ 

833 SecurityLoginFailure.enable_user(req, self.username) 

834 

835 # ------------------------------------------------------------------------- 

836 # Details used for authentication 

837 # ------------------------------------------------------------------------- 

838 

839 @property 

840 def partial_email(self) -> str: 

841 """ 

842 Returns a partially obscured version of the user's e-mail address. 

843 

844 There doesn't seem to be an agreed way of doing this. Here we show the 

845 first and last letter of the "local-part" (see 

846 https://en.wikipedia.org/wiki/Email_address), separated by asterisks. 

847 If the local part is a single letter, it's shown twice. 

848 """ 

849 regex = r"^(.+)@(.*)$" 

850 

851 m = re.search(regex, self.email) 

852 first_letter = m.group(1)[0] 

853 last_letter = m.group(1)[-1] 

854 domain = m.group(2) 

855 

856 return f"{first_letter}{OBSCURE_EMAIL_ASTERISKS}{last_letter}@{domain}" 

857 

858 @property 

859 def raw_phone_number(self) -> str: 

860 """ 

861 Returns the user's phone number in E164 format: 

862 https://en.wikipedia.org/wiki/E.164 

863 """ 

864 return phonenumbers.format_number( 

865 self.phone_number, phonenumbers.PhoneNumberFormat.E164 

866 ) 

867 

868 @property 

869 def partial_phone_number(self) -> str: 

870 """ 

871 Returns a partially obscured version of the user's phone number. 

872 

873 There doesn't seem to be an agreed way of doing this either. 

874 https://www.karansaini.com/fuzzing-obfuscated-phone-numbers/ 

875 """ 

876 return f"{OBSCURE_PHONE_ASTERISKS}{self.raw_phone_number[-2:]}" 

877 

878 # ------------------------------------------------------------------------- 

879 # Requirements 

880 # ------------------------------------------------------------------------- 

881 

882 @property 

883 def must_agree_terms(self) -> bool: 

884 """ 

885 Does the user still need to agree the terms/conditions of use? 

886 """ 

887 if self.when_agreed_terms_of_use is None: 

888 # User hasn't agreed yet. 

889 return True 

890 if self.when_agreed_terms_of_use.date() < TERMS_CONDITIONS_UPDATE_DATE: 

891 # User hasn't agreed since the terms were updated. 

892 return True 

893 return False 

894 

895 def agree_terms(self, req: "CamcopsRequest") -> None: 

896 """ 

897 Mark the user as having agreed to the terms/conditions of use now. 

898 """ 

899 self.when_agreed_terms_of_use = req.now 

900 

901 def must_set_mfa_method(self, req: "CamcopsRequest") -> bool: 

902 """ 

903 Does the user still need to select a (valid) multi-factor 

904 authentication method? We are happy if the user has selected a method 

905 that is approved in the current config. 

906 """ 

907 return self.mfa_method not in req.config.mfa_methods 

908 

909 # ------------------------------------------------------------------------- 

910 # Groups 

911 # ------------------------------------------------------------------------- 

912 

913 @property 

914 def group_ids(self) -> List[int]: 

915 """ 

916 Return a list of group IDs for all the groups that the user is a member 

917 of. 

918 """ 

919 return sorted(list(g.id for g in self.groups)) 

920 

921 @property 

922 def group_names(self) -> List[str]: 

923 """ 

924 Returns a list of group names for all the groups that the user is a 

925 member of. 

926 """ 

927 return sorted(list(g.name for g in self.groups)) 

928 

929 def set_group_ids(self, group_ids: List[int]) -> None: 

930 """ 

931 Set the user's groups to the groups whose integer IDs are in the 

932 ``group_ids`` list, and remove the user from any other groups. 

933 """ 

934 dbsession = SqlASession.object_session(self) 

935 assert dbsession, ( 

936 "User.set_group_ids() called on a User that's not " 

937 "yet in a session" 

938 ) 

939 # groups = Group.get_groups_from_id_list(dbsession, group_ids) 

940 

941 # Remove groups that no longer apply 

942 for m in self.user_group_memberships: 

943 if m.group_id not in group_ids: 

944 dbsession.delete(m) 

945 # Add new groups 

946 current_group_ids = [m.group_id for m in self.user_group_memberships] 

947 new_group_ids = [ 

948 gid for gid in group_ids if gid not in current_group_ids 

949 ] 

950 for gid in new_group_ids: 

951 self.user_group_memberships.append( 

952 UserGroupMembership(user_id=self.id, group_id=gid) 

953 ) 

954 

955 @property 

956 def ids_of_groups_user_may_see(self) -> List[int]: 

957 """ 

958 Return a list of group IDs for groups that the user may see data 

959 from. (That means the groups the user is in, plus any other groups that 

960 the user's groups are authorized to see.) 

961 """ 

962 # Incidentally: "list_a += list_b" vs "list_a.extend(list_b)": 

963 # https://stackoverflow.com/questions/3653298/concatenating-two-lists-difference-between-and-extend # noqa 

964 # ... not much difference; perhaps += is slightly better (also clearer) 

965 # And relevant set operations: 

966 # https://stackoverflow.com/questions/4045403/python-how-to-add-the-contents-of-an-iterable-to-a-set # noqa 

967 # 

968 # Process as a set rather than a list, to eliminate duplicates: 

969 group_ids = set() # type: Set[int] 

970 for my_group in self.groups: # type: Group 

971 group_ids.update(my_group.ids_of_groups_group_may_see()) 

972 return list(group_ids) 

973 # Return as a list rather than a set, because SQLAlchemy's in_() 

974 # operator only likes lists and ?tuples. 

975 

976 @property 

977 def ids_of_groups_user_may_dump(self) -> List[int]: 

978 """ 

979 Return a list of group IDs for groups that the user may dump data 

980 from. 

981 

982 See also :meth:`groups_user_may_dump`. 

983 

984 This does **not** give "second-hand authority" to dump. For example, 

985 if group G1 can "see" G2, and user U has authority to dump G1, that 

986 authority does not extend to G2. 

987 """ 

988 if self.superuser: 

989 return Group.all_group_ids( 

990 dbsession=SqlASession.object_session(self) 

991 ) 

992 memberships = self.user_group_memberships # type: _TYPE_LUGM 

993 return [m.group_id for m in memberships if m.may_dump_data] 

994 

995 @property 

996 def ids_of_groups_user_may_report_on(self) -> List[int]: 

997 """ 

998 Returns a list of group IDs for groups that the user may run reports 

999 on. 

1000 

1001 This does **not** give "second-hand authority" to dump. For example, 

1002 if group G1 can "see" G2, and user U has authority to report on G1, 

1003 that authority does not extend to G2. 

1004 """ 

1005 if self.superuser: 

1006 return Group.all_group_ids( 

1007 dbsession=SqlASession.object_session(self) 

1008 ) 

1009 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1010 return [m.group_id for m in memberships if m.may_run_reports] 

1011 

1012 @property 

1013 def ids_of_groups_user_is_admin_for(self) -> List[int]: 

1014 """ 

1015 Returns a list of group IDs for groups that the user is an 

1016 administrator for. 

1017 """ 

1018 if self.superuser: 

1019 return Group.all_group_ids( 

1020 dbsession=SqlASession.object_session(self) 

1021 ) 

1022 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1023 return [m.group_id for m in memberships if m.groupadmin] 

1024 

1025 @property 

1026 def ids_of_groups_user_may_manage_patients_in(self) -> List[int]: 

1027 """ 

1028 Returns a list of group IDs for groups that the user may 

1029 add/edit/delete patients in 

1030 """ 

1031 if self.superuser: 

1032 return Group.all_group_ids( 

1033 dbsession=SqlASession.object_session(self) 

1034 ) 

1035 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1036 return [ 

1037 m.group_id 

1038 for m in memberships 

1039 if m.may_manage_patients or m.groupadmin 

1040 ] 

1041 

1042 @property 

1043 def ids_of_groups_user_may_email_patients_in(self) -> List[int]: 

1044 """ 

1045 Returns a list of group IDs for groups that the user may send emails to 

1046 patients in 

1047 """ 

1048 if self.superuser: 

1049 return Group.all_group_ids( 

1050 dbsession=SqlASession.object_session(self) 

1051 ) 

1052 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1053 return [ 

1054 m.group_id 

1055 for m in memberships 

1056 if m.may_email_patients or m.groupadmin 

1057 ] 

1058 

1059 @property 

1060 def names_of_groups_user_is_admin_for(self) -> List[str]: 

1061 """ 

1062 Returns a list of group names for groups that the user is an 

1063 administrator for. 

1064 """ 

1065 if self.superuser: 

1066 return Group.all_group_names( 

1067 dbsession=SqlASession.object_session(self) 

1068 ) 

1069 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1070 return [m.group.name for m in memberships if m.groupadmin] 

1071 

1072 @property 

1073 def names_of_groups_user_is_admin_for_csv(self) -> str: 

1074 """ 

1075 Returns a list of group names for groups that the user is an 

1076 administrator for. 

1077 """ 

1078 names = sorted(self.names_of_groups_user_is_admin_for) 

1079 return ", ".join(names) 

1080 

1081 def may_administer_group(self, group_id: int) -> bool: 

1082 """ 

1083 May this user administer the group identified by ``group_id``? 

1084 """ 

1085 if self.superuser: 

1086 return True 

1087 return group_id in self.ids_of_groups_user_is_admin_for 

1088 

1089 def may_manage_patients_in_group(self, group_id: int) -> bool: 

1090 """ 

1091 May this user manage patients in the group identified by ``group_id``? 

1092 """ 

1093 if self.superuser: 

1094 return True 

1095 return group_id in self.ids_of_groups_user_may_manage_patients_in 

1096 

1097 def may_email_patients_in_group(self, group_id: int) -> bool: 

1098 """ 

1099 May this user send emails to patients in the group identified by 

1100 ``group_id``? 

1101 """ 

1102 if self.superuser: 

1103 return True 

1104 return group_id in self.ids_of_groups_user_may_email_patients_in 

1105 

1106 @property 

1107 def groups_user_may_see(self) -> List[Group]: 

1108 """ 

1109 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1110 objects for groups the user can see. 

1111 

1112 Less efficient than the group ID version; for visual display (see 

1113 ``view_own_user_info.mako``). 

1114 

1115 """ 

1116 groups = set(self.groups) # type: Set[Group] 

1117 for my_group in self.groups: # type: Group 

1118 groups.update(set(my_group.can_see_other_groups)) 

1119 return sorted(list(groups), key=lambda g: g.name) 

1120 

1121 @property 

1122 def groups_user_may_dump(self) -> List[Group]: 

1123 """ 

1124 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1125 objects for groups the user can dump. 

1126 

1127 For security notes, see :meth:`ids_of_groups_user_may_dump`. 

1128 

1129 Less efficient than the group ID version (see 

1130 :meth:`ids_of_groups_user_may_dump`). This version is for visual 

1131 display (see ``view_own_user_info.mako``). 

1132 

1133 """ 

1134 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1135 return sorted( 

1136 [m.group for m in memberships if m.may_dump_data], 

1137 key=lambda g: g.name, 

1138 ) 

1139 

1140 @property 

1141 def groups_user_may_report_on(self) -> List[Group]: 

1142 """ 

1143 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1144 objects for groups the user can report on. 

1145 

1146 For security notes, see :meth:`ids_of_groups_user_may_report_on`. 

1147 

1148 Less efficient than the group ID version (see 

1149 :meth:`ids_of_groups_user_may_report_on`). This version is for visual 

1150 display (see ``view_own_user_info.mako``). 

1151 

1152 """ 

1153 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1154 return sorted( 

1155 [m.group for m in memberships if m.may_run_reports], 

1156 key=lambda g: g.name, 

1157 ) 

1158 

1159 @property 

1160 def groups_user_may_upload_into(self) -> List[Group]: 

1161 """ 

1162 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1163 objects for groups the user can upload into. 

1164 

1165 For visual display (see ``view_own_user_info.mako``). 

1166 

1167 """ 

1168 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1169 return sorted( 

1170 [m.group for m in memberships if m.may_upload], 

1171 key=lambda g: g.name, 

1172 ) 

1173 

1174 @property 

1175 def groups_user_may_add_special_notes(self) -> List[Group]: 

1176 """ 

1177 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1178 objects for groups the user can add special notes to. 

1179 

1180 For visual display (see ``view_own_user_info.mako``). 

1181 

1182 """ 

1183 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1184 return sorted( 

1185 [m.group for m in memberships if m.may_add_notes], 

1186 key=lambda g: g.name, 

1187 ) 

1188 

1189 @property 

1190 def groups_user_may_see_all_pts_when_unfiltered(self) -> List[Group]: 

1191 """ 

1192 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1193 objects for groups the user can see all patients when unfiltered. 

1194 

1195 For visual display (see ``view_own_user_info.mako``). 

1196 

1197 """ 

1198 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1199 return sorted( 

1200 [ 

1201 m.group 

1202 for m in memberships 

1203 if m.view_all_patients_when_unfiltered 

1204 ], 

1205 key=lambda g: g.name, 

1206 ) 

1207 

1208 @property 

1209 def groups_user_is_admin_for(self) -> List[Group]: 

1210 """ 

1211 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1212 objects for groups the user is an administrator for. 

1213 

1214 Less efficient than the group ID version; for visual display (see 

1215 ``view_own_user_info.mako``). 

1216 

1217 """ 

1218 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1219 return sorted( 

1220 [m.group for m in memberships if m.groupadmin], 

1221 key=lambda g: g.name, 

1222 ) 

1223 

1224 @property 

1225 def groups_user_may_manage_patients_in(self) -> List[Group]: 

1226 """ 

1227 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1228 objects for groups the user may manage patients in. 

1229 """ 

1230 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1231 return sorted( 

1232 [m.group for m in memberships if m.may_manage_patients], 

1233 key=lambda g: g.name, 

1234 ) 

1235 

1236 @property 

1237 def groups_user_may_email_patients_in(self) -> List[Group]: 

1238 """ 

1239 Returns a list of :class:`camcops_server.cc_modules.cc_group.Group` 

1240 objects for groups the user may send emails to patients in. 

1241 """ 

1242 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1243 return sorted( 

1244 [m.group for m in memberships if m.may_email_patients], 

1245 key=lambda g: g.name, 

1246 ) 

1247 

1248 @property 

1249 def is_a_groupadmin(self) -> bool: 

1250 """ 

1251 Is the user a specifically defined group administrator (for any group)? 

1252 """ 

1253 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1254 return any(m.groupadmin for m in memberships) 

1255 

1256 @property 

1257 def authorized_as_groupadmin(self) -> bool: 

1258 """ 

1259 Is the user authorized as a group administrator for any group (either 

1260 by being specifically set as a group administrator, or by being a 

1261 superuser)? 

1262 """ 

1263 return self.superuser or self.is_a_groupadmin 

1264 

1265 def membership_for_group_id(self, group_id: int) -> UserGroupMembership: 

1266 """ 

1267 Returns the :class:`UserGroupMembership` object relating this user 

1268 to the group identified by ``group_id``. 

1269 """ 

1270 return next( 

1271 (m for m in self.user_group_memberships if m.group_id == group_id), 

1272 None, 

1273 ) 

1274 

1275 def group_ids_nonsuperuser_may_see_when_unfiltered(self) -> List[int]: 

1276 """ 

1277 Which group IDs may this user see all patients for, when unfiltered? 

1278 """ 

1279 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1280 return [ 

1281 m.group_id 

1282 for m in memberships 

1283 if m.view_all_patients_when_unfiltered 

1284 ] 

1285 

1286 def may_upload_to_group(self, group_id: int) -> bool: 

1287 """ 

1288 May this user upload to the specified group? 

1289 """ 

1290 if self.superuser: 

1291 return True 

1292 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1293 return any(m.may_upload for m in memberships if m.group_id == group_id) 

1294 

1295 # ------------------------------------------------------------------------- 

1296 # Other permissions 

1297 # ------------------------------------------------------------------------- 

1298 

1299 @property 

1300 def may_login_as_tablet(self) -> bool: 

1301 """ 

1302 May the user login via the client (tablet) API? 

1303 """ 

1304 return self.may_upload or self.may_register_devices 

1305 

1306 @property 

1307 def may_use_webviewer(self) -> bool: 

1308 """ 

1309 May this user log in to the web front end? 

1310 """ 

1311 if self.superuser: 

1312 return True 

1313 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1314 return any(m.may_use_webviewer for m in memberships) 

1315 

1316 def authorized_to_add_special_note(self, group_id: int) -> bool: 

1317 """ 

1318 Is this user authorized to add special notes for the group identified 

1319 by ``group_id``? 

1320 """ 

1321 if self.superuser: 

1322 return True 

1323 membership = self.membership_for_group_id(group_id) 

1324 if not membership: 

1325 return False 

1326 return membership.may_add_notes 

1327 

1328 def authorized_to_erase_tasks(self, group_id: int) -> bool: 

1329 """ 

1330 Is this user authorized to erase tasks for the group identified 

1331 by ``group_id``? 

1332 """ 

1333 if self.superuser: 

1334 return True 

1335 membership = self.membership_for_group_id(group_id) 

1336 if not membership: 

1337 return False 

1338 return membership.groupadmin 

1339 

1340 @property 

1341 def authorized_to_dump(self) -> bool: 

1342 """ 

1343 Is the user authorized to dump data (for some group)? 

1344 """ 

1345 if self.superuser: 

1346 return True 

1347 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1348 return any(m.may_dump_data for m in memberships) 

1349 

1350 @property 

1351 def authorized_for_reports(self) -> bool: 

1352 """ 

1353 Is the user authorized to run reports (for some group)? 

1354 """ 

1355 if self.superuser: 

1356 return True 

1357 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1358 return any(m.may_run_reports for m in memberships) 

1359 

1360 @property 

1361 def authorized_to_manage_patients(self) -> bool: 

1362 """ 

1363 Is the user authorized to manage patients (for some group)? 

1364 """ 

1365 if self.authorized_as_groupadmin: 

1366 return True 

1367 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1368 return any(m.may_manage_patients for m in memberships) 

1369 

1370 @property 

1371 def authorized_to_email_patients(self) -> bool: 

1372 """ 

1373 Is the user authorized to send emails to patients (for some group)? 

1374 """ 

1375 if self.authorized_as_groupadmin: 

1376 return True 

1377 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1378 return any(m.may_email_patients for m in memberships) 

1379 

1380 @property 

1381 def may_view_all_patients_when_unfiltered(self) -> bool: 

1382 """ 

1383 May the user view all patients when no filters are applied (for all 

1384 groups that the user is a member of)? 

1385 """ 

1386 if self.superuser: 

1387 return True 

1388 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1389 return all(m.view_all_patients_when_unfiltered for m in memberships) 

1390 

1391 @property 

1392 def may_view_no_patients_when_unfiltered(self) -> bool: 

1393 """ 

1394 May the user view *no* patients when no filters are applied? 

1395 """ 

1396 if self.superuser: 

1397 return False 

1398 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1399 return all( 

1400 not m.view_all_patients_when_unfiltered for m in memberships 

1401 ) 

1402 

1403 @property 

1404 def may_upload(self) -> bool: 

1405 """ 

1406 May this user upload to the group that is set as their upload group? 

1407 """ 

1408 if self.upload_group_id is None: 

1409 return False 

1410 return self.may_upload_to_group(self.upload_group_id) 

1411 

1412 @property 

1413 def may_register_devices(self) -> bool: 

1414 """ 

1415 May this user register devices? 

1416 

1417 You can register a device if your chosen upload groups allow you to do 

1418 so. (You have to have a chosen group -- even for superusers -- because 

1419 the tablet wants group ID policies at the moment of registration, so we 

1420 have to know which group.) 

1421 """ 

1422 if self.upload_group_id is None: 

1423 return False 

1424 if self.superuser: 

1425 return True 

1426 memberships = self.user_group_memberships # type: _TYPE_LUGM 

1427 return any( 

1428 m.may_register_devices 

1429 for m in memberships 

1430 if m.group_id == self.upload_group_id 

1431 ) 

1432 

1433 # ------------------------------------------------------------------------- 

1434 # Managing other users 

1435 # ------------------------------------------------------------------------- 

1436 

1437 def managed_users(self) -> Optional[Query]: 

1438 """ 

1439 Return a query for all users managed by this user. 

1440 

1441 LOGIC SHOULD MATCH :meth:`may_edit_user`. 

1442 """ 

1443 dbsession = SqlASession.object_session(self) 

1444 if not self.superuser and not self.is_a_groupadmin: 

1445 return dbsession.query(User).filter(false()) 

1446 # https://stackoverflow.com/questions/10345327/sqlalchemy-create-an-intentionally-empty-query # noqa 

1447 q = ( 

1448 dbsession.query(User) 

1449 .filter(User.username != USER_NAME_FOR_SYSTEM) 

1450 .order_by(User.username) 

1451 ) 

1452 if not self.superuser: 

1453 # LOGIC SHOULD MATCH assert_may_edit_user 

1454 # Restrict to users who are members of groups that I am an admin 

1455 # for: 

1456 groupadmin_group_ids = self.ids_of_groups_user_is_admin_for 

1457 # noinspection PyUnresolvedReferences 

1458 ugm2 = UserGroupMembership.__table__.alias("ugm2") 

1459 q = ( 

1460 q.join(User.user_group_memberships) 

1461 .filter(not_(User.superuser)) 

1462 .filter(UserGroupMembership.group_id.in_(groupadmin_group_ids)) 

1463 .filter( 

1464 ~exists() 

1465 .select_from(ugm2) 

1466 .where(and_(ugm2.c.user_id == User.id, ugm2.c.groupadmin)) 

1467 ) 

1468 ) 

1469 # ... no superusers 

1470 # ... user must be a member of one of our groups 

1471 # ... no groupadmins 

1472 # https://stackoverflow.com/questions/14600619/using-not-exists-clause-in-sqlalchemy-orm-query # noqa 

1473 return q 

1474 

1475 def may_edit_user( 

1476 self, req: "CamcopsRequest", other: "User" 

1477 ) -> Tuple[bool, str]: 

1478 """ 

1479 May the ``self`` user edit the ``other`` user? 

1480 

1481 Args: 

1482 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1483 other: the user to be edited (potentially) 

1484 

1485 Returns: 

1486 tuple: may_edit (bool), reason_why_not (str) 

1487 

1488 LOGIC SHOULD MATCH :meth:`managed_users`. 

1489 """ 

1490 _ = req.gettext 

1491 if other.username == USER_NAME_FOR_SYSTEM: 

1492 return False, _("Nobody may edit the system user") 

1493 if not self.superuser: 

1494 if other.superuser: 

1495 return False, _("You may not edit a superuser") 

1496 if other.is_a_groupadmin: 

1497 return False, _("You may not edit a group administrator") 

1498 groupadmin_group_ids = self.ids_of_groups_user_is_admin_for 

1499 if not any(gid in groupadmin_group_ids for gid in other.group_ids): 

1500 return ( 

1501 False, 

1502 _( 

1503 "You are not a group administrator for any " 

1504 "groups that this user is in" 

1505 ), 

1506 ) 

1507 return True, "" 

1508 

1509 

1510# ============================================================================= 

1511# Command-line password control 

1512# ============================================================================= 

1513 

1514 

1515def set_password_directly( 

1516 req: "CamcopsRequest", username: str, password: str 

1517) -> bool: 

1518 """ 

1519 If the user exists, set its password. Returns Boolean success. 

1520 Used from the command line. 

1521 """ 

1522 dbsession = req.dbsession 

1523 user = User.get_user_by_name(dbsession, username) 

1524 if not user: 

1525 return False 

1526 user.set_password(req, password) 

1527 user.enable(req) 

1528 audit(req, "Password changed for user " + user.username, from_console=True) 

1529 return True