Coverage for src/meshadmin/server/networks/models.py: 92%

169 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-09 15:09 +0200

1from datetime import datetime, timedelta 

2from uuid import uuid4 

3 

4import httpx 

5import structlog 

6from django.contrib.auth import get_user_model 

7from django.db import models 

8from django.db.models import UniqueConstraint 

9from django.utils import timezone 

10 

11User = get_user_model() 

12 

13logger = structlog.get_logger(__name__) 

14 

15 

16class TimestampedModel(models.Model): 

17 created_at = models.DateTimeField(auto_now_add=True) 

18 updated_at = models.DateTimeField(auto_now=True) 

19 

20 class Meta: 

21 abstract = True 

22 

23 

24class NetworkMembership(TimestampedModel): 

25 class Role(models.TextChoices): 

26 ADMIN = "ADMIN", "Admin" 

27 MEMBER = "MEMBER", "Member" 

28 

29 network = models.ForeignKey( 

30 "Network", on_delete=models.CASCADE, related_name="memberships" 

31 ) 

32 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="memberships") 

33 role = models.CharField(max_length=20, choices=Role.choices, default=Role.MEMBER) 

34 

35 class Meta: 

36 constraints = [ 

37 UniqueConstraint( 

38 fields=("network", "user"), name="unique_network_membership" 

39 ), 

40 ] 

41 

42 def __str__(self): 

43 return f"{self.user} - {self.network} {self.role}" 

44 

45 

46class Network(TimestampedModel): 

47 name = models.CharField(max_length=200, unique=True) 

48 cidr = models.CharField(max_length=200, default="100.100.64.0/24") 

49 update_interval = models.IntegerField( 

50 default=5, help_text="Interval in seconds for host configuration updates" 

51 ) 

52 members = models.ManyToManyField( 

53 User, through="NetworkMembership", related_name="networks" 

54 ) 

55 

56 def __str__(self): 

57 return self.name 

58 

59 

60class CA(TimestampedModel): 

61 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

62 name = models.CharField(max_length=200) 

63 

64 key = models.TextField() 

65 cert = models.TextField() 

66 cert_print = models.JSONField(blank=True, null=True) 

67 

68 def __str__(self): 

69 return self.name 

70 

71 @property 

72 def days_until_expiry(self): 

73 if ( 

74 not self.cert_print 

75 or "details" not in self.cert_print 

76 or "notAfter" not in self.cert_print["details"] 

77 ): 

78 return None 

79 

80 expiry_str = self.cert_print["details"]["notAfter"] 

81 try: 

82 expiry_date = datetime.fromisoformat(expiry_str) 

83 days = (expiry_date - timezone.now()).days 

84 return max(0, days) 

85 except (ValueError, TypeError) as e: 

86 logger.error("error parsing expiry date", ca_name=self.name, error=e) 

87 return None 

88 

89 

90class SigningCA(TimestampedModel): 

91 network = models.OneToOneField(Network, on_delete=models.CASCADE) 

92 ca = models.ForeignKey(CA, on_delete=models.CASCADE) 

93 

94 def __str__(self): 

95 return self.ca.name 

96 

97 

98class Group(TimestampedModel): 

99 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

100 name = models.CharField(max_length=200) 

101 description = models.TextField(blank=True) 

102 

103 class Meta: 

104 constraints = [ 

105 UniqueConstraint( 

106 fields=("network", "name"), name="unique_group_name_per_network" 

107 ), 

108 ] 

109 

110 def __str__(self): 

111 return self.name 

112 

113 

114class GroupConfig(TimestampedModel): 

115 CONFIG_KEY_CHOICES = [ 

116 ("lighthouse.dns.host", "Lighthouse DNS Host"), 

117 ("lighthouse.dns.port", "Lighthouse DNS Port"), 

118 ("lighthouse.serve_dns", "Lighthouse Serve DNS"), 

119 ("listen.host", "Listen Host"), 

120 ("listen.port", "Listen Port"), 

121 ("punchy.punch", "Punchy Punch"), 

122 ("punchy.delay", "Punchy Delay"), 

123 ("punchy.respond", "Punchy Respond"), 

124 ("punchy.respond_delay", "Punchy Respond Delay"), 

125 ("stats.type", "Stats Type"), 

126 ("stats.prefix", "Stats Prefix"), 

127 ("stats.protocol", "Stats Protocol"), 

128 ("stats.host", "Stats Host"), 

129 ("stats.interval", "Stats Interval"), 

130 ("stats.path", "Stats Path"), 

131 ("stats.namespace", "Stats Namespace"), 

132 ("stats.subsystem", "Stats Subsystem"), 

133 ("stats.message_metrics", "Stats Message Metrics"), 

134 ("stats.lighthouse_metrics", "Stats Lighthouse Metrics"), 

135 ] 

136 

137 group = models.ForeignKey( 

138 Group, on_delete=models.CASCADE, related_name="config_overrides" 

139 ) 

140 key = models.CharField( 

141 max_length=255, 

142 choices=CONFIG_KEY_CHOICES, 

143 help_text="Configuration property to override or add.", 

144 ) 

145 value = models.TextField( 

146 help_text="The value to override the config property with." 

147 ) 

148 

149 class Meta: 

150 constraints = [ 

151 UniqueConstraint(fields=("group", "key"), name="unique_group_config_key"), 

152 ] 

153 

154 def __str__(self): 

155 return f"{self.group.name} - {self.key}" 

156 

157 

158class Rule(TimestampedModel): 

159 class Direction(models.TextChoices): 

160 INBOUND = "I", "inbound" 

161 OUTBOUND = "O", "outbound" 

162 

163 security_group = models.ForeignKey( 

164 Group, on_delete=models.CASCADE, related_name="rules" 

165 ) 

166 

167 direction = models.CharField( 

168 max_length=10, choices=Direction.choices, default=Direction.INBOUND 

169 ) 

170 

171 class Protocol(models.TextChoices): 

172 ANY = "any", "any" 

173 UDP = "udp", "udp" 

174 TCP = "tcp", "tcp" 

175 ICMP = "icmp", "icmp" 

176 

177 proto = models.CharField( 

178 max_length=4, 

179 choices=Protocol.choices, 

180 default=Protocol.ANY, 

181 help_text="One of any, tcp, udp, or icmp", 

182 ) 

183 

184 port = models.CharField( 

185 max_length=255, 

186 help_text=( 

187 "Takes 0 or any as any, a single number (e.g. 80), a range (e.g. 200-901), " 

188 "or fragment to match second and further fragments of fragmented packets " 

189 "(since there is no port available)." 

190 ), 

191 default="any", 

192 ) 

193 

194 group = models.ForeignKey( 

195 Group, 

196 on_delete=models.CASCADE, 

197 help_text="Can be any or a literal group name, ie default-group", 

198 blank=True, 

199 null=True, 

200 related_name="fw_groups", 

201 ) 

202 

203 groups = models.ManyToManyField( 

204 Group, 

205 blank=True, 

206 related_name="fw_groupss", 

207 help_text=( 

208 "Same as group but accepts multiple values. Multiple values are AND'd together " 

209 "and a certificate must contain all groups to pass." 

210 ), 

211 ) 

212 

213 cidr = models.CharField( 

214 max_length=255, 

215 help_text="a CIDR, 0.0.0.0/0 is any. This restricts which Nebula IP addresses the rule allows.", 

216 blank=True, 

217 null=True, 

218 ) 

219 

220 local_cidr = models.CharField( 

221 max_length=255, 

222 help_text=( 

223 "a local CIDR, 0.0.0.0/0 is any. This restricts which destination IP addresses, " 

224 "when using unsafe_routes, the rule allows. If unset, the rule will allow access " 

225 "to the specified ports on both the node itself as well as any IP addresses it routes to." 

226 ), 

227 blank=True, 

228 null=True, 

229 ) 

230 

231 description = models.CharField( 

232 max_length=255, 

233 blank=True, 

234 help_text="Optional description for this rule", 

235 ) 

236 

237 

238class Host(TimestampedModel): 

239 class Meta: 

240 unique_together = (("network", "name"), ("network", "assigned_ip")) 

241 

242 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

243 name = models.CharField(max_length=200) 

244 assigned_ip = models.CharField(max_length=200, blank=True, null=True) 

245 

246 is_lighthouse = models.BooleanField(default=False) 

247 public_ip_or_hostname = models.CharField(max_length=200, blank=True, null=True) 

248 

249 is_relay = models.BooleanField(default=False) 

250 use_relay = models.BooleanField(default=True) 

251 

252 public_key = models.TextField(max_length=1000, blank=True, null=True) 

253 public_auth_kid = models.CharField(max_length=200, blank=True, null=True) 

254 public_auth_key = models.TextField(max_length=1000, blank=True, null=True) 

255 groups = models.ManyToManyField(Group, blank=True) 

256 interface = models.CharField(max_length=200, default="nebula1") 

257 

258 last_config_refresh = models.DateTimeField(blank=True, null=True) 

259 

260 config_freeze = models.BooleanField( 

261 default=False, 

262 help_text="When true, host will not receive automatic config updates", 

263 ) 

264 

265 is_ephemeral = models.BooleanField( 

266 default=False, 

267 help_text="When true, this host will be removed if offline for over 10 minutes", 

268 ) 

269 cli_version = models.CharField( 

270 max_length=50, blank=True, help_text="Client CLI version" 

271 ) 

272 upgrade_requested = models.BooleanField(default=False) 

273 

274 def __str__(self): 

275 return self.name 

276 

277 @property 

278 def is_config_stale(self): 

279 if not self.last_config_refresh: 

280 return True 

281 

282 stale_threshold = timezone.now() - timedelta(hours=24) 

283 return self.last_config_refresh < stale_threshold 

284 

285 @property 

286 def is_cli_version_outdated(self) -> bool | str: 

287 if not self.cli_version: 

288 return "Unknown" 

289 

290 latest_version = self.get_latest_cli_version 

291 if not latest_version: 

292 return "Unknown" 

293 

294 return self.cli_version != latest_version 

295 

296 @property 

297 def get_latest_cli_version(self) -> str | None: 

298 try: 

299 response = httpx.get("https://pypi.org/pypi/meshadmin/json") 

300 if response.status_code == 200: 

301 return response.json()["info"]["version"] 

302 except Exception: 

303 pass 

304 return None 

305 

306 

307class HostCert(TimestampedModel): 

308 host = models.ForeignKey(Host, on_delete=models.CASCADE) 

309 ca = models.ForeignKey(CA, on_delete=models.CASCADE) 

310 cert = models.TextField(max_length=1000) 

311 hash = models.IntegerField(default=0) 

312 

313 

314class HostConfig(TimestampedModel): 

315 host = models.ForeignKey(Host, on_delete=models.CASCADE) 

316 config = models.TextField() 

317 sha256 = models.CharField(max_length=200, blank=True, null=True) 

318 

319 

320class Template(TimestampedModel): 

321 name = models.CharField(max_length=200) 

322 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

323 is_lighthouse = models.BooleanField(default=False) 

324 is_relay = models.BooleanField(default=False) 

325 use_relay = models.BooleanField(default=True) 

326 groups = models.ManyToManyField(Group, blank=True) 

327 enrollment_key = models.CharField(max_length=255, default=uuid4, unique=True) 

328 

329 reusable = models.BooleanField( 

330 default=True, help_text="When false, this key can not be used multiple times" 

331 ) 

332 usage_limit = models.IntegerField( 

333 null=True, 

334 blank=True, 

335 help_text="Maximum number of peers that can enroll with this key. Null means unlimited.", 

336 ) 

337 expires_at = models.DateTimeField( 

338 null=True, 

339 blank=True, 

340 help_text="When this key expires. Null means no expiration.", 

341 ) 

342 usage_count = models.IntegerField( 

343 default=0, help_text="Number of times this key has been used" 

344 ) 

345 ephemeral_peers = models.BooleanField( 

346 default=False, 

347 help_text="When true, peers that are offline for over 10 minutes will be removed", 

348 ) 

349 

350 def __str__(self): 

351 return self.name 

352 

353 

354class ConfigRollout(TimestampedModel): 

355 name = models.CharField(max_length=200) 

356 status = models.CharField( 

357 max_length=20, 

358 choices=[ 

359 ("PENDING", "Pending"), 

360 ("IN_PROGRESS", "In Progress"), 

361 ("COMPLETED", "Completed"), 

362 ("FAILED", "Failed"), 

363 ], 

364 default="PENDING", 

365 ) 

366 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

367 target_hosts = models.ManyToManyField(Host, related_name="pending_rollouts") 

368 completed_hosts = models.ManyToManyField(Host, related_name="completed_rollouts") 

369 notes = models.TextField(blank=True)