Coverage for cc_modules/cc_group.py: 75%

77 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 15:51 +0100

1""" 

2camcops_server/cc_modules/cc_group.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Group definitions.** 

27 

28""" 

29 

30import logging 

31from typing import List, Optional, Set 

32 

33from cardinal_pythonlib.logs import BraceStyleAdapter 

34from cardinal_pythonlib.reprfunc import simple_repr 

35from cardinal_pythonlib.sqlalchemy.orm_inspect import gen_columns 

36from cardinal_pythonlib.sqlalchemy.orm_query import exists_orm 

37from sqlalchemy.ext.associationproxy import association_proxy 

38from sqlalchemy.orm import ( 

39 Mapped, 

40 mapped_column, 

41 relationship, 

42 Session as SqlASession, 

43) 

44from sqlalchemy.sql.schema import Column, ForeignKey, Table 

45from sqlalchemy.sql.sqltypes import Integer 

46 

47from camcops_server.cc_modules.cc_ipuse import IpUse 

48from camcops_server.cc_modules.cc_policy import TokenizedPolicy 

49from camcops_server.cc_modules.cc_sqla_coltypes import ( 

50 GroupDescriptionColType, 

51 GroupNameColType, 

52 IdPolicyColType, 

53) 

54from camcops_server.cc_modules.cc_sqlalchemy import Base 

55 

56log = BraceStyleAdapter(logging.getLogger(__name__)) 

57 

58 

59# ============================================================================= 

60# Group-to-group association table 

61# ============================================================================= 

62# A group can always see itself, but may also have permission to see others; 

63# see "Groups" in the CamCOPS documentation. 

64 

65# https://docs.sqlalchemy.org/en/latest/orm/join_conditions.html#self-referential-many-to-many-relationship # noqa 

66group_group_table = Table( 

67 "_security_group_group", 

68 Base.metadata, 

69 Column( 

70 "group_id", 

71 Integer, 

72 ForeignKey("_security_groups.id"), 

73 primary_key=True, 

74 ), 

75 Column( 

76 "can_see_group_id", 

77 Integer, 

78 ForeignKey("_security_groups.id"), 

79 primary_key=True, 

80 ), 

81) 

82 

83 

84# ============================================================================= 

85# Group 

86# ============================================================================= 

87 

88 

89class Group(Base): 

90 """ 

91 Represents a CamCOPS group. 

92 

93 See "Groups" in the CamCOPS documentation. 

94 """ 

95 

96 __tablename__ = "_security_groups" 

97 

98 id: Mapped[int] = mapped_column( 

99 primary_key=True, 

100 autoincrement=True, 

101 index=True, 

102 comment="Group ID", 

103 ) 

104 name: Mapped[str] = mapped_column( 

105 GroupNameColType, 

106 index=True, 

107 unique=True, 

108 comment="Group name", 

109 ) 

110 description: Mapped[Optional[str]] = mapped_column( 

111 GroupDescriptionColType, 

112 comment="Description of the group", 

113 ) 

114 upload_policy: Mapped[Optional[str]] = mapped_column( 

115 IdPolicyColType, 

116 comment="Upload policy for the group, as a string", 

117 ) 

118 finalize_policy: Mapped[Optional[str]] = mapped_column( 

119 IdPolicyColType, 

120 comment="Finalize policy for the group, as a string", 

121 ) 

122 

123 ip_use_id: Mapped[Optional[int]] = mapped_column( 

124 ForeignKey(IpUse.id), 

125 comment=f"FK to {IpUse.__tablename__}.{IpUse.id.name}", 

126 ) 

127 

128 ip_use = relationship( 

129 IpUse, uselist=False, single_parent=True, cascade="all, delete-orphan" 

130 ) 

131 

132 # users = relationship( 

133 # "User", # defined with string to avoid circular import 

134 # secondary=user_group_table, # link via this mapping table 

135 # back_populates="groups" # see User.groups 

136 # ) 

137 user_group_memberships = relationship( 

138 "UserGroupMembership", back_populates="group" 

139 ) 

140 users = association_proxy("user_group_memberships", "user") 

141 

142 regular_user_group_memberships = relationship( 

143 "UserGroupMembership", 

144 primaryjoin="and_(" 

145 "Group.id==UserGroupMembership.group_id, " 

146 "User.id==UserGroupMembership.user_id, " 

147 "User.auto_generated==False)", 

148 viewonly=True, 

149 ) 

150 regular_users = association_proxy("regular_user_group_memberships", "user") 

151 

152 can_see_other_groups = relationship( 

153 "Group", # link back to our own class 

154 secondary=group_group_table, # via this mapping table 

155 primaryjoin=(id == group_group_table.c.group_id), # "us" 

156 secondaryjoin=(id == group_group_table.c.can_see_group_id), # "them" 

157 back_populates="groups_that_can_see_us", 

158 lazy="joined", # not sure this does anything here 

159 cascade_backrefs=False, 

160 ) 

161 

162 groups_that_can_see_us = relationship( 

163 "Group", 

164 secondary=group_group_table, # via this mapping table 

165 primaryjoin=(id == group_group_table.c.can_see_group_id), # "us" 

166 secondaryjoin=(id == group_group_table.c.group_id), # "them" 

167 back_populates="can_see_other_groups", 

168 cascade_backrefs=False, 

169 ) 

170 

171 def __str__(self) -> str: 

172 return f"Group {self.id} ({self.name})" 

173 

174 def __repr__(self) -> str: 

175 attrnames = sorted(attrname for attrname, _ in gen_columns(self)) 

176 return simple_repr(self, attrnames) 

177 

178 def ids_of_other_groups_group_may_see(self) -> Set[int]: 

179 """ 

180 Returns a list of group IDs for groups that this group has permission 

181 to see. (Always includes our own group number.) 

182 """ 

183 group_ids = set() # type: Set[int] 

184 for other_group in self.can_see_other_groups: # type: Group 

185 other_group_id = other_group.id # type: Optional[int] 

186 if other_group_id is not None: 

187 group_ids.add(other_group_id) 

188 return group_ids 

189 

190 def ids_of_groups_group_may_see(self) -> Set[int]: 

191 """ 

192 Returns a list of group IDs for groups that this group has permission 

193 to see. (Always includes our own group number.) 

194 """ 

195 ourself = {self.id} # type: Set[int] 

196 return ourself.union(self.ids_of_other_groups_group_may_see()) 

197 

198 @classmethod 

199 def get_groups_from_id_list( 

200 cls, dbsession: SqlASession, group_ids: List[int] 

201 ) -> List["Group"]: 

202 """ 

203 Fetches groups from a list of group IDs. 

204 """ 

205 return dbsession.query(Group).filter(Group.id.in_(group_ids)).all() 

206 

207 @classmethod 

208 def get_group_by_name( 

209 cls, dbsession: SqlASession, name: str 

210 ) -> Optional["Group"]: 

211 """ 

212 Fetches a group from its name. 

213 """ 

214 if not name: 

215 return None 

216 return dbsession.query(cls).filter(cls.name == name).first() 

217 

218 @classmethod 

219 def get_group_by_id( 

220 cls, dbsession: SqlASession, group_id: int 

221 ) -> Optional["Group"]: 

222 """ 

223 Fetches a group from its integer ID. 

224 """ 

225 if group_id is None: 

226 return None 

227 return dbsession.query(cls).filter(cls.id == group_id).first() 

228 

229 @classmethod 

230 def get_all_groups(cls, dbsession: SqlASession) -> List["Group"]: 

231 """ 

232 Returns all groups. 

233 """ 

234 return dbsession.query(Group).all() 

235 

236 @classmethod 

237 def all_group_ids(cls, dbsession: SqlASession) -> List[int]: 

238 """ 

239 Returns all group IDs. 

240 """ 

241 query = dbsession.query(cls).order_by(cls.id) 

242 return [g.id for g in query] 

243 

244 @classmethod 

245 def all_group_names(cls, dbsession: SqlASession) -> List[str]: 

246 """ 

247 Returns all group names. 

248 """ 

249 query = dbsession.query(cls).order_by(cls.id) 

250 return [g.name for g in query] 

251 

252 @classmethod 

253 def group_exists(cls, dbsession: SqlASession, group_id: int) -> bool: 

254 """ 

255 Does a particular group (specified by its integer ID) exist? 

256 """ 

257 return exists_orm(dbsession, cls, cls.id == group_id) # type: ignore[arg-type] # noqa: E501 

258 

259 def tokenized_upload_policy(self) -> TokenizedPolicy: 

260 """ 

261 Returns the upload policy for a group. 

262 """ 

263 return TokenizedPolicy(self.upload_policy) 

264 

265 def tokenized_finalize_policy(self) -> TokenizedPolicy: 

266 """ 

267 Returns the finalize policy for a group. 

268 """ 

269 return TokenizedPolicy(self.finalize_policy)