Coverage for cc_modules/cc_audit.py: 92%

40 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:51 +0100

1""" 

2camcops_server/cc_modules/cc_audit.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Auditing.** 

27 

28The Big Brother part. 

29 

30""" 

31 

32import datetime 

33from typing import Optional, TYPE_CHECKING 

34 

35from sqlalchemy.orm import Mapped, mapped_column, relationship 

36from sqlalchemy.sql.schema import ForeignKey 

37from sqlalchemy.sql.sqltypes import UnicodeText 

38 

39from camcops_server.cc_modules.cc_sqla_coltypes import ( 

40 AuditSourceColType, 

41 IPAddressColType, 

42 TableNameColType, 

43) 

44from camcops_server.cc_modules.cc_sqlalchemy import Base 

45 

46if TYPE_CHECKING: 

47 from camcops_server.cc_modules.cc_request import CamcopsRequest 

48 

49 

50MAX_AUDIT_STRING_LENGTH = 65000 

51 

52 

53# ============================================================================= 

54# AuditEntry 

55# ============================================================================= 

56 

57 

58class AuditEntry(Base): 

59 """ 

60 An entry in the audit table. 

61 """ 

62 

63 __tablename__ = "_security_audit" 

64 

65 id: Mapped[int] = mapped_column( 

66 primary_key=True, 

67 autoincrement=True, 

68 index=True, 

69 comment="Arbitrary primary key", 

70 ) 

71 when_access_utc: Mapped[datetime.datetime] = mapped_column( 

72 index=True, 

73 comment="Date/time of access (UTC)", 

74 ) 

75 source: Mapped[str] = mapped_column( 

76 AuditSourceColType, 

77 comment="Source (e.g. tablet, webviewer)", 

78 ) 

79 remote_addr: Mapped[Optional[str]] = mapped_column( 

80 IPAddressColType, 

81 comment="IP address of the remote computer", 

82 ) 

83 user_id: Mapped[Optional[int]] = mapped_column( 

84 ForeignKey("_security_users.id"), 

85 comment="ID of user, where applicable", 

86 ) 

87 user = relationship("User") 

88 device_id: Mapped[Optional[int]] = mapped_column( 

89 ForeignKey("_security_devices.id"), 

90 comment="Device ID, where applicable", 

91 ) 

92 device = relationship("Device") 

93 table_name: Mapped[Optional[str]] = mapped_column( 

94 TableNameColType, 

95 comment="Table involved, where applicable", 

96 ) 

97 server_pk: Mapped[Optional[int]] = mapped_column( 

98 comment="Server PK (table._pk), where applicable" 

99 ) 

100 patient_server_pk: Mapped[Optional[int]] = mapped_column( 

101 comment="Server PK of the patient (patient._pk) concerned, or " 

102 "NULL if not applicable", 

103 ) 

104 details: Mapped[Optional[str]] = mapped_column( 

105 "details", UnicodeText, comment="Details of the access" 

106 ) # in practice, has 65,535 character limit and isn't Unicode. 

107 # See MAX_AUDIT_STRING_LENGTH above. 

108 

109 

110# ============================================================================= 

111# Audit function 

112# ============================================================================= 

113 

114 

115def audit( 

116 req: "CamcopsRequest", 

117 details: str, 

118 patient_server_pk: int = None, 

119 table: str = None, 

120 server_pk: int = None, 

121 device_id: int = None, 

122 remote_addr: str = None, 

123 user_id: int = None, 

124 from_console: bool = False, 

125 from_dbclient: bool = False, 

126) -> None: 

127 """ 

128 Write an entry to the audit log. 

129 """ 

130 dbsession = req.dbsession 

131 if not remote_addr: 

132 remote_addr = req.remote_addr if req else None 

133 if user_id is None: 

134 user_id = req.user_id 

135 if from_console: 

136 source = "console" 

137 elif from_dbclient: 

138 source = "tablet" 

139 else: 

140 source = "webviewer" 

141 now = req.now_utc 

142 if details and len(details) > MAX_AUDIT_STRING_LENGTH: 

143 details = details[:MAX_AUDIT_STRING_LENGTH] 

144 # noinspection PyTypeChecker 

145 entry = AuditEntry( 

146 when_access_utc=now, 

147 source=source, 

148 remote_addr=remote_addr, 

149 user_id=user_id, 

150 device_id=device_id, 

151 table_name=table, 

152 server_pk=server_pk, 

153 patient_server_pk=patient_server_pk, 

154 details=details, 

155 ) 

156 dbsession.add(entry)