Coverage for src/meshadmin/server/networks/models.py: 92%

168 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-07 19:26 +0200

1from datetime import datetime, timedelta 

2from uuid import uuid4 

3 

4import httpx 

5import structlog 

6from django.contrib.auth import get_user_model 

7from django.db import models 

8from django.db.models import UniqueConstraint 

9from django.utils import timezone 

10 

11User = get_user_model() 

12 

13logger = structlog.get_logger(__name__) 

14 

15 

16class TimestampedModel(models.Model): 

17 created_at = models.DateTimeField(auto_now_add=True) 

18 updated_at = models.DateTimeField(auto_now=True) 

19 

20 class Meta: 

21 abstract = True 

22 

23 

24class NetworkMembership(TimestampedModel): 

25 class Role(models.TextChoices): 

26 ADMIN = "ADMIN", "Admin" 

27 MEMBER = "MEMBER", "Member" 

28 

29 network = models.ForeignKey( 

30 "Network", on_delete=models.CASCADE, related_name="memberships" 

31 ) 

32 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="memberships") 

33 role = models.CharField(max_length=20, choices=Role.choices, default=Role.MEMBER) 

34 

35 class Meta: 

36 constraints = [ 

37 UniqueConstraint( 

38 fields=("network", "user"), name="unique_network_membership" 

39 ), 

40 ] 

41 

42 def __str__(self): 

43 return f"{self.user} - {self.network} {self.role}" 

44 

45 

46class Network(TimestampedModel): 

47 name = models.CharField(max_length=200, unique=True) 

48 cidr = models.CharField(max_length=200, default="100.100.64.0/24") 

49 update_interval = models.IntegerField( 

50 default=5, help_text="Interval in seconds for host configuration updates" 

51 ) 

52 members = models.ManyToManyField( 

53 User, through="NetworkMembership", related_name="networks" 

54 ) 

55 

56 def __str__(self): 

57 return self.name 

58 

59 

60class CA(TimestampedModel): 

61 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

62 name = models.CharField(max_length=200) 

63 

64 key = models.TextField() 

65 cert = models.TextField() 

66 cert_print = models.JSONField(blank=True, null=True) 

67 

68 def __str__(self): 

69 return self.name 

70 

71 @property 

72 def days_until_expiry(self): 

73 if ( 

74 not self.cert_print 

75 or "details" not in self.cert_print 

76 or "notAfter" not in self.cert_print["details"] 

77 ): 

78 return None 

79 

80 expiry_str = self.cert_print["details"]["notAfter"] 

81 try: 

82 expiry_date = datetime.fromisoformat(expiry_str) 

83 days = (expiry_date - timezone.now()).days 

84 return max(0, days) 

85 except (ValueError, TypeError) as e: 

86 logger.error("error parsing expiry date", ca_name=self.name, error=e) 

87 return None 

88 

89 

90class SigningCA(TimestampedModel): 

91 network = models.OneToOneField(Network, on_delete=models.CASCADE) 

92 ca = models.ForeignKey(CA, on_delete=models.CASCADE) 

93 

94 def __str__(self): 

95 return self.ca.name 

96 

97 

98class Group(TimestampedModel): 

99 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

100 name = models.CharField(max_length=200) 

101 description = models.TextField(blank=True) 

102 

103 class Meta: 

104 constraints = [ 

105 UniqueConstraint( 

106 fields=("network", "name"), name="unique_group_name_per_network" 

107 ), 

108 ] 

109 

110 def __str__(self): 

111 return self.name 

112 

113 

114class GroupConfig(TimestampedModel): 

115 CONFIG_KEY_CHOICES = [ 

116 ("lighthouse.dns.host", "Lighthouse DNS Host"), 

117 ("lighthouse.dns.port", "Lighthouse DNS Port"), 

118 ("lighthouse.serve_dns", "Lighthouse Serve DNS"), 

119 ("listen.host", "Listen Host"), 

120 ("listen.port", "Listen Port"), 

121 ("punchy.punch", "Punchy Punch"), 

122 ("punchy.delay", "Punchy Delay"), 

123 ("punchy.respond", "Punchy Respond"), 

124 ("punchy.respond_delay", "Punchy Respond Delay"), 

125 ("stats.type", "Stats Type"), 

126 ("stats.prefix", "Stats Prefix"), 

127 ("stats.protocol", "Stats Protocol"), 

128 ("stats.host", "Stats Host"), 

129 ("stats.interval", "Stats Interval"), 

130 ("stats.path", "Stats Path"), 

131 ("stats.namespace", "Stats Namespace"), 

132 ("stats.subsystem", "Stats Subsystem"), 

133 ("stats.message_metrics", "Stats Message Metrics"), 

134 ("stats.lighthouse_metrics", "Stats Lighthouse Metrics"), 

135 ] 

136 

137 group = models.ForeignKey( 

138 Group, on_delete=models.CASCADE, related_name="config_overrides" 

139 ) 

140 key = models.CharField( 

141 max_length=255, 

142 choices=CONFIG_KEY_CHOICES, 

143 help_text="Configuration property to override or add.", 

144 ) 

145 value = models.TextField( 

146 help_text="The value to override the config property with." 

147 ) 

148 

149 class Meta: 

150 constraints = [ 

151 UniqueConstraint(fields=("group", "key"), name="unique_group_config_key"), 

152 ] 

153 

154 def __str__(self): 

155 return f"{self.group.name} - {self.key}" 

156 

157 

158class Rule(TimestampedModel): 

159 class Direction(models.TextChoices): 

160 INBOUND = "I", "inbound" 

161 OUTBOUND = "O", "outbound" 

162 

163 security_group = models.ForeignKey( 

164 Group, on_delete=models.CASCADE, related_name="rules" 

165 ) 

166 

167 direction = models.CharField( 

168 max_length=10, choices=Direction.choices, default=Direction.INBOUND 

169 ) 

170 

171 class Protocol(models.TextChoices): 

172 ANY = "any", "any" 

173 UDP = "udp", "udp" 

174 TCP = "tcp", "tcp" 

175 ICMP = "icmp", "icmp" 

176 

177 proto = models.CharField( 

178 max_length=4, 

179 choices=Protocol.choices, 

180 default=Protocol.ANY, 

181 help_text="One of any, tcp, udp, or icmp", 

182 ) 

183 

184 port = models.CharField( 

185 max_length=255, 

186 help_text=( 

187 "Takes 0 or any as any, a single number (e.g. 80), a range (e.g. 200-901), " 

188 "or fragment to match second and further fragments of fragmented packets " 

189 "(since there is no port available)." 

190 ), 

191 default="any", 

192 ) 

193 

194 group = models.ForeignKey( 

195 Group, 

196 on_delete=models.CASCADE, 

197 help_text="Can be any or a literal group name, ie default-group", 

198 blank=True, 

199 null=True, 

200 related_name="fw_groups", 

201 ) 

202 

203 groups = models.ManyToManyField( 

204 Group, 

205 blank=True, 

206 related_name="fw_groupss", 

207 help_text=( 

208 "Same as group but accepts multiple values. Multiple values are AND'd together " 

209 "and a certificate must contain all groups to pass." 

210 ), 

211 ) 

212 

213 cidr = models.CharField( 

214 max_length=255, 

215 help_text="a CIDR, 0.0.0.0/0 is any. This restricts which Nebula IP addresses the rule allows.", 

216 blank=True, 

217 null=True, 

218 ) 

219 

220 local_cidr = models.CharField( 

221 max_length=255, 

222 help_text=( 

223 "a local CIDR, 0.0.0.0/0 is any. This restricts which destination IP addresses, " 

224 "when using unsafe_routes, the rule allows. If unset, the rule will allow access " 

225 "to the specified ports on both the node itself as well as any IP addresses it routes to." 

226 ), 

227 blank=True, 

228 null=True, 

229 ) 

230 

231 

232class Host(TimestampedModel): 

233 class Meta: 

234 unique_together = (("network", "name"), ("network", "assigned_ip")) 

235 

236 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

237 name = models.CharField(max_length=200) 

238 assigned_ip = models.CharField(max_length=200, blank=True, null=True) 

239 

240 is_lighthouse = models.BooleanField(default=False) 

241 public_ip_or_hostname = models.CharField(max_length=200, blank=True, null=True) 

242 

243 is_relay = models.BooleanField(default=False) 

244 use_relay = models.BooleanField(default=True) 

245 

246 public_key = models.TextField(max_length=1000, blank=True, null=True) 

247 public_auth_kid = models.CharField(max_length=200, blank=True, null=True) 

248 public_auth_key = models.TextField(max_length=1000, blank=True, null=True) 

249 groups = models.ManyToManyField(Group, blank=True) 

250 interface = models.CharField(max_length=200, default="nebula1") 

251 

252 last_config_refresh = models.DateTimeField(blank=True, null=True) 

253 

254 config_freeze = models.BooleanField( 

255 default=False, 

256 help_text="When true, host will not receive automatic config updates", 

257 ) 

258 

259 is_ephemeral = models.BooleanField( 

260 default=False, 

261 help_text="When true, this host will be removed if offline for over 10 minutes", 

262 ) 

263 cli_version = models.CharField( 

264 max_length=50, blank=True, help_text="Client CLI version" 

265 ) 

266 upgrade_requested = models.BooleanField(default=False) 

267 

268 def __str__(self): 

269 return self.name 

270 

271 @property 

272 def is_config_stale(self): 

273 if not self.last_config_refresh: 

274 return True 

275 

276 stale_threshold = timezone.now() - timedelta(hours=24) 

277 return self.last_config_refresh < stale_threshold 

278 

279 @property 

280 def is_cli_version_outdated(self) -> bool | str: 

281 if not self.cli_version: 

282 return "Unknown" 

283 

284 latest_version = self.get_latest_cli_version 

285 if not latest_version: 

286 return "Unknown" 

287 

288 return self.cli_version != latest_version 

289 

290 @property 

291 def get_latest_cli_version(self) -> str | None: 

292 try: 

293 response = httpx.get("https://pypi.org/pypi/meshadmin/json") 

294 if response.status_code == 200: 

295 return response.json()["info"]["version"] 

296 except Exception: 

297 pass 

298 return None 

299 

300 

301class HostCert(TimestampedModel): 

302 host = models.ForeignKey(Host, on_delete=models.CASCADE) 

303 ca = models.ForeignKey(CA, on_delete=models.CASCADE) 

304 cert = models.TextField(max_length=1000) 

305 hash = models.IntegerField(default=0) 

306 

307 

308class HostConfig(TimestampedModel): 

309 host = models.ForeignKey(Host, on_delete=models.CASCADE) 

310 config = models.TextField() 

311 sha256 = models.CharField(max_length=200, blank=True, null=True) 

312 

313 

314class Template(TimestampedModel): 

315 name = models.CharField(max_length=200) 

316 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

317 is_lighthouse = models.BooleanField(default=False) 

318 is_relay = models.BooleanField(default=False) 

319 use_relay = models.BooleanField(default=True) 

320 groups = models.ManyToManyField(Group, blank=True) 

321 enrollment_key = models.CharField(max_length=255, default=uuid4, unique=True) 

322 

323 reusable = models.BooleanField( 

324 default=True, help_text="When false, this key can not be used multiple times" 

325 ) 

326 usage_limit = models.IntegerField( 

327 null=True, 

328 blank=True, 

329 help_text="Maximum number of peers that can enroll with this key. Null means unlimited.", 

330 ) 

331 expires_at = models.DateTimeField( 

332 null=True, 

333 blank=True, 

334 help_text="When this key expires. Null means no expiration.", 

335 ) 

336 usage_count = models.IntegerField( 

337 default=0, help_text="Number of times this key has been used" 

338 ) 

339 ephemeral_peers = models.BooleanField( 

340 default=False, 

341 help_text="When true, peers that are offline for over 10 minutes will be removed", 

342 ) 

343 

344 def __str__(self): 

345 return self.name 

346 

347 

348class ConfigRollout(TimestampedModel): 

349 name = models.CharField(max_length=200) 

350 status = models.CharField( 

351 max_length=20, 

352 choices=[ 

353 ("PENDING", "Pending"), 

354 ("IN_PROGRESS", "In Progress"), 

355 ("COMPLETED", "Completed"), 

356 ("FAILED", "Failed"), 

357 ], 

358 default="PENDING", 

359 ) 

360 network = models.ForeignKey(Network, on_delete=models.CASCADE) 

361 target_hosts = models.ManyToManyField(Host, related_name="pending_rollouts") 

362 completed_hosts = models.ManyToManyField(Host, related_name="completed_rollouts") 

363 notes = models.TextField(blank=True)