Coverage for cc_modules/cc_audit.py: 92%
40 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
1"""
2camcops_server/cc_modules/cc_audit.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Auditing.**
28The Big Brother part.
30"""
32import datetime
33from typing import Optional, TYPE_CHECKING
35from sqlalchemy.orm import Mapped, mapped_column, relationship
36from sqlalchemy.sql.schema import ForeignKey
37from sqlalchemy.sql.sqltypes import UnicodeText
39from camcops_server.cc_modules.cc_sqla_coltypes import (
40 AuditSourceColType,
41 IPAddressColType,
42 TableNameColType,
43)
44from camcops_server.cc_modules.cc_sqlalchemy import Base
46if TYPE_CHECKING:
47 from camcops_server.cc_modules.cc_request import CamcopsRequest
50MAX_AUDIT_STRING_LENGTH = 65000
53# =============================================================================
54# AuditEntry
55# =============================================================================
58class AuditEntry(Base):
59 """
60 An entry in the audit table.
61 """
63 __tablename__ = "_security_audit"
65 id: Mapped[int] = mapped_column(
66 primary_key=True,
67 autoincrement=True,
68 index=True,
69 comment="Arbitrary primary key",
70 )
71 when_access_utc: Mapped[datetime.datetime] = mapped_column(
72 index=True,
73 comment="Date/time of access (UTC)",
74 )
75 source: Mapped[str] = mapped_column(
76 AuditSourceColType,
77 comment="Source (e.g. tablet, webviewer)",
78 )
79 remote_addr: Mapped[Optional[str]] = mapped_column(
80 IPAddressColType,
81 comment="IP address of the remote computer",
82 )
83 user_id: Mapped[Optional[int]] = mapped_column(
84 ForeignKey("_security_users.id"),
85 comment="ID of user, where applicable",
86 )
87 user = relationship("User")
88 device_id: Mapped[Optional[int]] = mapped_column(
89 ForeignKey("_security_devices.id"),
90 comment="Device ID, where applicable",
91 )
92 device = relationship("Device")
93 table_name: Mapped[Optional[str]] = mapped_column(
94 TableNameColType,
95 comment="Table involved, where applicable",
96 )
97 server_pk: Mapped[Optional[int]] = mapped_column(
98 comment="Server PK (table._pk), where applicable"
99 )
100 patient_server_pk: Mapped[Optional[int]] = mapped_column(
101 comment="Server PK of the patient (patient._pk) concerned, or "
102 "NULL if not applicable",
103 )
104 details: Mapped[Optional[str]] = mapped_column(
105 "details", UnicodeText, comment="Details of the access"
106 ) # in practice, has 65,535 character limit and isn't Unicode.
107 # See MAX_AUDIT_STRING_LENGTH above.
110# =============================================================================
111# Audit function
112# =============================================================================
115def audit(
116 req: "CamcopsRequest",
117 details: str,
118 patient_server_pk: int = None,
119 table: str = None,
120 server_pk: int = None,
121 device_id: int = None,
122 remote_addr: str = None,
123 user_id: int = None,
124 from_console: bool = False,
125 from_dbclient: bool = False,
126) -> None:
127 """
128 Write an entry to the audit log.
129 """
130 dbsession = req.dbsession
131 if not remote_addr:
132 remote_addr = req.remote_addr if req else None
133 if user_id is None:
134 user_id = req.user_id
135 if from_console:
136 source = "console"
137 elif from_dbclient:
138 source = "tablet"
139 else:
140 source = "webviewer"
141 now = req.now_utc
142 if details and len(details) > MAX_AUDIT_STRING_LENGTH:
143 details = details[:MAX_AUDIT_STRING_LENGTH]
144 # noinspection PyTypeChecker
145 entry = AuditEntry(
146 when_access_utc=now,
147 source=source,
148 remote_addr=remote_addr,
149 user_id=user_id,
150 device_id=device_id,
151 table_name=table,
152 server_pk=server_pk,
153 patient_server_pk=patient_server_pk,
154 details=details,
155 )
156 dbsession.add(entry)