Coverage for src/meshadmin/server/networks/models.py: 93%

148 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-22 07:09 +0200

1from datetime import datetime, timedelta 

2from uuid import uuid4 

3 

4import structlog 

5from django.contrib.auth import get_user_model 

6from django.db import models 

7from django.db.models import UniqueConstraint 

8from django.utils import timezone 

9 

10User = get_user_model() 

11 

12logger = structlog.get_logger(__name__) 

13 

14 

15class TimestampedModel(models.Model): 

16 created_at = models.DateTimeField(auto_now_add=True) 

17 updated_at = models.DateTimeField(auto_now=True) 

18 

19 class Meta: 

20 abstract = True 

21 

22 

23class NetworkMembership(TimestampedModel): 

24 class Role(models.TextChoices): 

25 ADMIN = "ADMIN", "Admin" 

26 MEMBER = "MEMBER", "Member" 

27 

28 network = models.ForeignKey( 

29 "Network", on_delete=models.CASCADE, related_name="memberships" 

30 ) 

31 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="memberships") 

32 role = models.CharField(max_length=20, choices=Role.choices, default=Role.MEMBER) 

33 

34 class Meta: 

35 constraints = [ 

36 UniqueConstraint( 

37 fields=("network", "user"), name="unique_network_membership" 

38 ), 

39 ] 

40 

41 def __str__(self): 

42 return f"{self.user} - {self.network} {self.role}" 

43 

44 

45class Network(TimestampedModel): 

46 name = models.CharField(max_length=200, unique=True) 

47 cidr = models.CharField(max_length=200, default="100.100.64.0/24") 

48 update_interval = models.IntegerField( 

49 default=5, help_text="Interval in seconds for host configuration updates" 

50 ) 

51 members = models.ManyToManyField( 

52 User, through="NetworkMembership", related_name="networks" 

53 ) 

54 

55 def __str__(self): 

56 return self.name 

57 

58 

59class CA(TimestampedModel): 

60 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

61 name = models.CharField(max_length=200) 

62 

63 key = models.TextField() 

64 cert = models.TextField() 

65 cert_print = models.JSONField(blank=True, null=True) 

66 

67 def __str__(self): 

68 return self.name 

69 

70 @property 

71 def days_until_expiry(self): 

72 if ( 

73 not self.cert_print 

74 or "details" not in self.cert_print 

75 or "notAfter" not in self.cert_print["details"] 

76 ): 

77 return None 

78 

79 expiry_str = self.cert_print["details"]["notAfter"] 

80 try: 

81 expiry_date = datetime.fromisoformat(expiry_str) 

82 days = (expiry_date - timezone.now()).days 

83 return max(0, days) 

84 except (ValueError, TypeError) as e: 

85 logger.error("error parsing expiry date", ca_name=self.name, error=e) 

86 return None 

87 

88 

89class SigningCA(TimestampedModel): 

90 network = models.OneToOneField(Network, on_delete=models.CASCADE) 

91 ca = models.ForeignKey(CA, on_delete=models.CASCADE) 

92 

93 def __str__(self): 

94 return self.ca.name 

95 

96 

97class Group(TimestampedModel): 

98 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

99 name = models.CharField(max_length=200) 

100 description = models.TextField(blank=True) 

101 

102 class Meta: 

103 constraints = [ 

104 UniqueConstraint( 

105 fields=("network", "name"), name="unique_group_name_per_network" 

106 ), 

107 ] 

108 

109 def __str__(self): 

110 return self.name 

111 

112 

113class GroupConfig(TimestampedModel): 

114 CONFIG_KEY_CHOICES = [ 

115 ("lighthouse.dns.host", "Lighthouse DNS Host"), 

116 ("lighthouse.dns.port", "Lighthouse DNS Port"), 

117 ("lighthouse.serve_dns", "Lighthouse Serve DNS"), 

118 ] 

119 

120 group = models.ForeignKey( 

121 Group, on_delete=models.CASCADE, related_name="config_overrides" 

122 ) 

123 key = models.CharField( 

124 max_length=255, 

125 choices=CONFIG_KEY_CHOICES, 

126 help_text="Configuration property to override or add.", 

127 ) 

128 value = models.TextField( 

129 help_text="The value to override the config property with." 

130 ) 

131 

132 class Meta: 

133 constraints = [ 

134 UniqueConstraint(fields=("group", "key"), name="unique_group_config_key"), 

135 ] 

136 

137 def __str__(self): 

138 return f"{self.group.name} - {self.key}" 

139 

140 

141class Rule(TimestampedModel): 

142 class Direction(models.TextChoices): 

143 INBOUND = "I", "inbound" 

144 OUTBOUND = "O", "outbound" 

145 

146 security_group = models.ForeignKey( 

147 Group, on_delete=models.CASCADE, related_name="rules" 

148 ) 

149 

150 direction = models.CharField( 

151 max_length=10, choices=Direction.choices, default=Direction.INBOUND 

152 ) 

153 

154 class Protocol(models.TextChoices): 

155 ANY = "any", "any" 

156 UDP = "udp", "udp" 

157 TCP = "tcp", "tcp" 

158 ICMP = "icmp", "icmp" 

159 

160 proto = models.CharField( 

161 max_length=4, 

162 choices=Protocol.choices, 

163 default=Protocol.ANY, 

164 help_text="One of any, tcp, udp, or icmp", 

165 ) 

166 

167 port = models.CharField( 

168 max_length=255, 

169 help_text=( 

170 "Takes 0 or any as any, a single number (e.g. 80), a range (e.g. 200-901), " 

171 "or fragment to match second and further fragments of fragmented packets " 

172 "(since there is no port available)." 

173 ), 

174 default="any", 

175 ) 

176 

177 group = models.ForeignKey( 

178 Group, 

179 on_delete=models.CASCADE, 

180 help_text="Can be any or a literal group name, ie default-group", 

181 blank=True, 

182 null=True, 

183 related_name="fw_groups", 

184 ) 

185 

186 groups = models.ManyToManyField( 

187 Group, 

188 blank=True, 

189 related_name="fw_groupss", 

190 help_text=( 

191 "Same as group but accepts multiple values. Multiple values are AND'd together " 

192 "and a certificate must contain all groups to pass." 

193 ), 

194 ) 

195 

196 cidr = models.CharField( 

197 max_length=255, 

198 help_text="a CIDR, 0.0.0.0/0 is any. This restricts which Nebula IP addresses the rule allows.", 

199 blank=True, 

200 null=True, 

201 ) 

202 

203 local_cidr = models.CharField( 

204 max_length=255, 

205 help_text=( 

206 "a local CIDR, 0.0.0.0/0 is any. This restricts which destination IP addresses, " 

207 "when using unsafe_routes, the rule allows. If unset, the rule will allow access " 

208 "to the specified ports on both the node itself as well as any IP addresses it routes to." 

209 ), 

210 blank=True, 

211 null=True, 

212 ) 

213 

214 

215class Host(TimestampedModel): 

216 class Meta: 

217 unique_together = (("network", "name"), ("network", "assigned_ip")) 

218 

219 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

220 name = models.CharField(max_length=200) 

221 assigned_ip = models.CharField(max_length=200, blank=True, null=True) 

222 

223 is_lighthouse = models.BooleanField(default=False) 

224 public_ip_or_hostname = models.CharField(max_length=200, blank=True, null=True) 

225 

226 is_relay = models.BooleanField(default=False) 

227 use_relay = models.BooleanField(default=True) 

228 

229 public_key = models.TextField(max_length=1000, blank=True, null=True) 

230 public_auth_kid = models.CharField(max_length=200, blank=True, null=True) 

231 public_auth_key = models.TextField(max_length=1000, blank=True, null=True) 

232 groups = models.ManyToManyField(Group, blank=True) 

233 interface = models.CharField(max_length=200, default="nebula1") 

234 

235 last_config_refresh = models.DateTimeField(blank=True, null=True) 

236 

237 config_freeze = models.BooleanField( 

238 default=False, 

239 help_text="When true, host will not receive automatic config updates", 

240 ) 

241 

242 is_ephemeral = models.BooleanField( 

243 default=False, 

244 help_text="When true, this host will be removed if offline for over 10 minutes", 

245 ) 

246 

247 def __str__(self): 

248 return self.name 

249 

250 @property 

251 def is_config_stale(self): 

252 if not self.last_config_refresh: 

253 return True 

254 

255 stale_threshold = timezone.now() - timedelta(hours=24) 

256 return self.last_config_refresh < stale_threshold 

257 

258 

259class HostCert(TimestampedModel): 

260 host = models.ForeignKey(Host, on_delete=models.CASCADE) 

261 ca = models.ForeignKey(CA, on_delete=models.CASCADE) 

262 cert = models.TextField(max_length=1000) 

263 hash = models.IntegerField(default=0) 

264 

265 

266class HostConfig(TimestampedModel): 

267 host = models.ForeignKey(Host, on_delete=models.CASCADE) 

268 config = models.TextField() 

269 sha256 = models.CharField(max_length=200, blank=True, null=True) 

270 

271 

272class Template(TimestampedModel): 

273 name = models.CharField(max_length=200) 

274 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

275 is_lighthouse = models.BooleanField(default=False) 

276 is_relay = models.BooleanField(default=False) 

277 use_relay = models.BooleanField(default=True) 

278 groups = models.ManyToManyField(Group, blank=True) 

279 enrollment_key = models.CharField(max_length=255, default=uuid4, unique=True) 

280 

281 reusable = models.BooleanField( 

282 default=True, help_text="When false, this key can not be used multiple times" 

283 ) 

284 usage_limit = models.IntegerField( 

285 null=True, 

286 blank=True, 

287 help_text="Maximum number of peers that can enroll with this key. Null means unlimited.", 

288 ) 

289 expires_at = models.DateTimeField( 

290 null=True, 

291 blank=True, 

292 help_text="When this key expires. Null means no expiration.", 

293 ) 

294 usage_count = models.IntegerField( 

295 default=0, help_text="Number of times this key has been used" 

296 ) 

297 ephemeral_peers = models.BooleanField( 

298 default=False, 

299 help_text="When true, peers that are offline for over 10 minutes will be removed", 

300 ) 

301 

302 def __str__(self): 

303 return self.name 

304 

305 

306class ConfigRollout(TimestampedModel): 

307 name = models.CharField(max_length=200) 

308 status = models.CharField( 

309 max_length=20, 

310 choices=[ 

311 ("PENDING", "Pending"), 

312 ("IN_PROGRESS", "In Progress"), 

313 ("COMPLETED", "Completed"), 

314 ("FAILED", "Failed"), 

315 ], 

316 default="PENDING", 

317 ) 

318 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

319 target_hosts = models.ManyToManyField(Host, related_name="pending_rollouts") 

320 completed_hosts = models.ManyToManyField(Host, related_name="completed_rollouts") 

321 notes = models.TextField(blank=True)