Coverage for cc_modules/cc_audit.py : 60%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/cc_modules/cc_audit.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27**Auditing.**
29The Big Brother part.
31"""
33from typing import TYPE_CHECKING
35from sqlalchemy.orm import relationship
36from sqlalchemy.sql.schema import Column, ForeignKey
37from sqlalchemy.sql.sqltypes import DateTime, Integer, UnicodeText
39from camcops_server.cc_modules.cc_sqla_coltypes import (
40 AuditSourceColType,
41 IPAddressColType,
42 TableNameColType,
43)
44from camcops_server.cc_modules.cc_sqlalchemy import Base
46if TYPE_CHECKING:
47 from camcops_server.cc_modules.cc_request import CamcopsRequest
50MAX_AUDIT_STRING_LENGTH = 65000
53# =============================================================================
54# AuditEntry
55# =============================================================================
57class AuditEntry(Base):
58 """
59 An entry in the audit table.
60 """
61 __tablename__ = "_security_audit"
63 id = Column(
64 "id", Integer,
65 primary_key=True, autoincrement=True, index=True,
66 comment="Arbitrary primary key"
67 )
68 when_access_utc = Column(
69 "when_access_utc", DateTime,
70 nullable=False, index=True,
71 comment="Date/time of access (UTC)"
72 )
73 source = Column(
74 "source", AuditSourceColType,
75 nullable=False,
76 comment="Source (e.g. tablet, webviewer)"
77 )
78 remote_addr = Column(
79 "remote_addr", IPAddressColType,
80 comment="IP address of the remote computer"
81 )
82 user_id = Column(
83 "user_id", Integer, ForeignKey("_security_users.id"),
84 comment="ID of user, where applicable"
85 )
86 user = relationship("User")
87 device_id = Column(
88 "device_id", Integer, ForeignKey("_security_devices.id"),
89 comment="Device ID, where applicable"
90 )
91 device = relationship("Device")
92 table_name = Column(
93 "table_name", TableNameColType,
94 comment="Table involved, where applicable"
95 )
96 server_pk = Column(
97 "server_pk", Integer,
98 comment="Server PK (table._pk), where applicable"
99 )
100 patient_server_pk = Column(
101 "patient_server_pk", Integer,
102 comment="Server PK of the patient (patient._pk) concerned, or "
103 "NULL if not applicable"
104 )
105 details = Column(
106 "details", UnicodeText,
107 comment="Details of the access"
108 ) # in practice, has 65,535 character limit and isn't Unicode.
109 # See MAX_AUDIT_STRING_LENGTH above.
112# =============================================================================
113# Audit function
114# =============================================================================
116def audit(req: "CamcopsRequest",
117 details: str,
118 patient_server_pk: int = None,
119 table: str = None,
120 server_pk: int = None,
121 device_id: int = None,
122 remote_addr: str = None,
123 user_id: int = None,
124 from_console: bool = False,
125 from_dbclient: bool = False) -> None:
126 """
127 Write an entry to the audit log.
128 """
129 dbsession = req.dbsession
130 if not remote_addr:
131 remote_addr = req.remote_addr if req else None
132 if user_id is None:
133 user_id = req.user_id
134 if from_console:
135 source = "console"
136 elif from_dbclient:
137 source = "tablet"
138 else:
139 source = "webviewer"
140 now = req.now_utc
141 if details and len(details) > MAX_AUDIT_STRING_LENGTH:
142 details = details[:MAX_AUDIT_STRING_LENGTH]
143 # noinspection PyTypeChecker
144 entry = AuditEntry(
145 when_access_utc=now,
146 source=source,
147 remote_addr=remote_addr,
148 user_id=user_id,
149 device_id=device_id,
150 table_name=table,
151 server_pk=server_pk,
152 patient_server_pk=patient_server_pk,
153 details=details
154 )
155 dbsession.add(entry)