Coverage for src/meshadmin/server/networks/models.py: 92%

168 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-06 11:46 +0200

1from datetime import datetime, timedelta 

2from uuid import uuid4 

3 

4import httpx 

5import structlog 

6from django.contrib.auth import get_user_model 

7from django.db import models 

8from django.db.models import UniqueConstraint 

9from django.utils import timezone 

10 

11User = get_user_model() 

12 

13logger = structlog.get_logger(__name__) 

14 

15 

16class TimestampedModel(models.Model): 

17 created_at = models.DateTimeField(auto_now_add=True) 

18 updated_at = models.DateTimeField(auto_now=True) 

19 

20 class Meta: 

21 abstract = True 

22 

23 

24class NetworkMembership(TimestampedModel): 

25 class Role(models.TextChoices): 

26 ADMIN = "ADMIN", "Admin" 

27 MEMBER = "MEMBER", "Member" 

28 

29 network = models.ForeignKey( 

30 "Network", on_delete=models.CASCADE, related_name="memberships" 

31 ) 

32 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="memberships") 

33 role = models.CharField(max_length=20, choices=Role.choices, default=Role.MEMBER) 

34 

35 class Meta: 

36 constraints = [ 

37 UniqueConstraint( 

38 fields=("network", "user"), name="unique_network_membership" 

39 ), 

40 ] 

41 

42 def __str__(self): 

43 return f"{self.user} - {self.network} {self.role}" 

44 

45 

46class Network(TimestampedModel): 

47 name = models.CharField(max_length=200, unique=True) 

48 cidr = models.CharField(max_length=200, default="100.100.64.0/24") 

49 update_interval = models.IntegerField( 

50 default=5, help_text="Interval in seconds for host configuration updates" 

51 ) 

52 members = models.ManyToManyField( 

53 User, through="NetworkMembership", related_name="networks" 

54 ) 

55 

56 def __str__(self): 

57 return self.name 

58 

59 

60class CA(TimestampedModel): 

61 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

62 name = models.CharField(max_length=200) 

63 

64 key = models.TextField() 

65 cert = models.TextField() 

66 cert_print = models.JSONField(blank=True, null=True) 

67 

68 def __str__(self): 

69 return self.name 

70 

71 @property 

72 def days_until_expiry(self): 

73 if ( 

74 not self.cert_print 

75 or "details" not in self.cert_print 

76 or "notAfter" not in self.cert_print["details"] 

77 ): 

78 return None 

79 

80 expiry_str = self.cert_print["details"]["notAfter"] 

81 try: 

82 expiry_date = datetime.fromisoformat(expiry_str) 

83 days = (expiry_date - timezone.now()).days 

84 return max(0, days) 

85 except (ValueError, TypeError) as e: 

86 logger.error("error parsing expiry date", ca_name=self.name, error=e) 

87 return None 

88 

89 

90class SigningCA(TimestampedModel): 

91 network = models.OneToOneField(Network, on_delete=models.CASCADE) 

92 ca = models.ForeignKey(CA, on_delete=models.CASCADE) 

93 

94 def __str__(self): 

95 return self.ca.name 

96 

97 

98class Group(TimestampedModel): 

99 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

100 name = models.CharField(max_length=200) 

101 description = models.TextField(blank=True) 

102 

103 class Meta: 

104 constraints = [ 

105 UniqueConstraint( 

106 fields=("network", "name"), name="unique_group_name_per_network" 

107 ), 

108 ] 

109 

110 def __str__(self): 

111 return self.name 

112 

113 

114class GroupConfig(TimestampedModel): 

115 CONFIG_KEY_CHOICES = [ 

116 ("lighthouse.dns.host", "Lighthouse DNS Host"), 

117 ("lighthouse.dns.port", "Lighthouse DNS Port"), 

118 ("lighthouse.serve_dns", "Lighthouse Serve DNS"), 

119 ] 

120 

121 group = models.ForeignKey( 

122 Group, on_delete=models.CASCADE, related_name="config_overrides" 

123 ) 

124 key = models.CharField( 

125 max_length=255, 

126 choices=CONFIG_KEY_CHOICES, 

127 help_text="Configuration property to override or add.", 

128 ) 

129 value = models.TextField( 

130 help_text="The value to override the config property with." 

131 ) 

132 

133 class Meta: 

134 constraints = [ 

135 UniqueConstraint(fields=("group", "key"), name="unique_group_config_key"), 

136 ] 

137 

138 def __str__(self): 

139 return f"{self.group.name} - {self.key}" 

140 

141 

142class Rule(TimestampedModel): 

143 class Direction(models.TextChoices): 

144 INBOUND = "I", "inbound" 

145 OUTBOUND = "O", "outbound" 

146 

147 security_group = models.ForeignKey( 

148 Group, on_delete=models.CASCADE, related_name="rules" 

149 ) 

150 

151 direction = models.CharField( 

152 max_length=10, choices=Direction.choices, default=Direction.INBOUND 

153 ) 

154 

155 class Protocol(models.TextChoices): 

156 ANY = "any", "any" 

157 UDP = "udp", "udp" 

158 TCP = "tcp", "tcp" 

159 ICMP = "icmp", "icmp" 

160 

161 proto = models.CharField( 

162 max_length=4, 

163 choices=Protocol.choices, 

164 default=Protocol.ANY, 

165 help_text="One of any, tcp, udp, or icmp", 

166 ) 

167 

168 port = models.CharField( 

169 max_length=255, 

170 help_text=( 

171 "Takes 0 or any as any, a single number (e.g. 80), a range (e.g. 200-901), " 

172 "or fragment to match second and further fragments of fragmented packets " 

173 "(since there is no port available)." 

174 ), 

175 default="any", 

176 ) 

177 

178 group = models.ForeignKey( 

179 Group, 

180 on_delete=models.CASCADE, 

181 help_text="Can be any or a literal group name, ie default-group", 

182 blank=True, 

183 null=True, 

184 related_name="fw_groups", 

185 ) 

186 

187 groups = models.ManyToManyField( 

188 Group, 

189 blank=True, 

190 related_name="fw_groupss", 

191 help_text=( 

192 "Same as group but accepts multiple values. Multiple values are AND'd together " 

193 "and a certificate must contain all groups to pass." 

194 ), 

195 ) 

196 

197 cidr = models.CharField( 

198 max_length=255, 

199 help_text="a CIDR, 0.0.0.0/0 is any. This restricts which Nebula IP addresses the rule allows.", 

200 blank=True, 

201 null=True, 

202 ) 

203 

204 local_cidr = models.CharField( 

205 max_length=255, 

206 help_text=( 

207 "a local CIDR, 0.0.0.0/0 is any. This restricts which destination IP addresses, " 

208 "when using unsafe_routes, the rule allows. If unset, the rule will allow access " 

209 "to the specified ports on both the node itself as well as any IP addresses it routes to." 

210 ), 

211 blank=True, 

212 null=True, 

213 ) 

214 

215 

216class Host(TimestampedModel): 

217 class Meta: 

218 unique_together = (("network", "name"), ("network", "assigned_ip")) 

219 

220 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

221 name = models.CharField(max_length=200) 

222 assigned_ip = models.CharField(max_length=200, blank=True, null=True) 

223 

224 is_lighthouse = models.BooleanField(default=False) 

225 public_ip_or_hostname = models.CharField(max_length=200, blank=True, null=True) 

226 

227 is_relay = models.BooleanField(default=False) 

228 use_relay = models.BooleanField(default=True) 

229 

230 public_key = models.TextField(max_length=1000, blank=True, null=True) 

231 public_auth_kid = models.CharField(max_length=200, blank=True, null=True) 

232 public_auth_key = models.TextField(max_length=1000, blank=True, null=True) 

233 groups = models.ManyToManyField(Group, blank=True) 

234 interface = models.CharField(max_length=200, default="nebula1") 

235 

236 last_config_refresh = models.DateTimeField(blank=True, null=True) 

237 

238 config_freeze = models.BooleanField( 

239 default=False, 

240 help_text="When true, host will not receive automatic config updates", 

241 ) 

242 

243 is_ephemeral = models.BooleanField( 

244 default=False, 

245 help_text="When true, this host will be removed if offline for over 10 minutes", 

246 ) 

247 cli_version = models.CharField( 

248 max_length=50, blank=True, help_text="Client CLI version" 

249 ) 

250 upgrade_requested = models.BooleanField(default=False) 

251 

252 def __str__(self): 

253 return self.name 

254 

255 @property 

256 def is_config_stale(self): 

257 if not self.last_config_refresh: 

258 return True 

259 

260 stale_threshold = timezone.now() - timedelta(hours=24) 

261 return self.last_config_refresh < stale_threshold 

262 

263 @property 

264 def is_cli_version_outdated(self) -> bool | str: 

265 if not self.cli_version: 

266 return "Unknown" 

267 

268 latest_version = self.get_latest_cli_version 

269 if not latest_version: 

270 return "Unknown" 

271 

272 return self.cli_version != latest_version 

273 

274 @property 

275 def get_latest_cli_version(self) -> str | None: 

276 try: 

277 response = httpx.get("https://pypi.org/pypi/meshadmin/json") 

278 if response.status_code == 200: 

279 return response.json()["info"]["version"] 

280 except Exception: 

281 pass 

282 return None 

283 

284 

285class HostCert(TimestampedModel): 

286 host = models.ForeignKey(Host, on_delete=models.CASCADE) 

287 ca = models.ForeignKey(CA, on_delete=models.CASCADE) 

288 cert = models.TextField(max_length=1000) 

289 hash = models.IntegerField(default=0) 

290 

291 

292class HostConfig(TimestampedModel): 

293 host = models.ForeignKey(Host, on_delete=models.CASCADE) 

294 config = models.TextField() 

295 sha256 = models.CharField(max_length=200, blank=True, null=True) 

296 

297 

298class Template(TimestampedModel): 

299 name = models.CharField(max_length=200) 

300 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

301 is_lighthouse = models.BooleanField(default=False) 

302 is_relay = models.BooleanField(default=False) 

303 use_relay = models.BooleanField(default=True) 

304 groups = models.ManyToManyField(Group, blank=True) 

305 enrollment_key = models.CharField(max_length=255, default=uuid4, unique=True) 

306 

307 reusable = models.BooleanField( 

308 default=True, help_text="When false, this key can not be used multiple times" 

309 ) 

310 usage_limit = models.IntegerField( 

311 null=True, 

312 blank=True, 

313 help_text="Maximum number of peers that can enroll with this key. Null means unlimited.", 

314 ) 

315 expires_at = models.DateTimeField( 

316 null=True, 

317 blank=True, 

318 help_text="When this key expires. Null means no expiration.", 

319 ) 

320 usage_count = models.IntegerField( 

321 default=0, help_text="Number of times this key has been used" 

322 ) 

323 ephemeral_peers = models.BooleanField( 

324 default=False, 

325 help_text="When true, peers that are offline for over 10 minutes will be removed", 

326 ) 

327 

328 def __str__(self): 

329 return self.name 

330 

331 

332class ConfigRollout(TimestampedModel): 

333 name = models.CharField(max_length=200) 

334 status = models.CharField( 

335 max_length=20, 

336 choices=[ 

337 ("PENDING", "Pending"), 

338 ("IN_PROGRESS", "In Progress"), 

339 ("COMPLETED", "Completed"), 

340 ("FAILED", "Failed"), 

341 ], 

342 default="PENDING", 

343 ) 

344 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

345 target_hosts = models.ManyToManyField(Host, related_name="pending_rollouts") 

346 completed_hosts = models.ManyToManyField(Host, related_name="completed_rollouts") 

347 notes = models.TextField(blank=True)