Coverage for src/meshadmin/server/networks/models.py: 92%
168 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 19:26 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 19:26 +0200
1from datetime import datetime, timedelta
2from uuid import uuid4
4import httpx
5import structlog
6from django.contrib.auth import get_user_model
7from django.db import models
8from django.db.models import UniqueConstraint
9from django.utils import timezone
11User = get_user_model()
13logger = structlog.get_logger(__name__)
16class TimestampedModel(models.Model):
17 created_at = models.DateTimeField(auto_now_add=True)
18 updated_at = models.DateTimeField(auto_now=True)
20 class Meta:
21 abstract = True
24class NetworkMembership(TimestampedModel):
25 class Role(models.TextChoices):
26 ADMIN = "ADMIN", "Admin"
27 MEMBER = "MEMBER", "Member"
29 network = models.ForeignKey(
30 "Network", on_delete=models.CASCADE, related_name="memberships"
31 )
32 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="memberships")
33 role = models.CharField(max_length=20, choices=Role.choices, default=Role.MEMBER)
35 class Meta:
36 constraints = [
37 UniqueConstraint(
38 fields=("network", "user"), name="unique_network_membership"
39 ),
40 ]
42 def __str__(self):
43 return f"{self.user} - {self.network} {self.role}"
46class Network(TimestampedModel):
47 name = models.CharField(max_length=200, unique=True)
48 cidr = models.CharField(max_length=200, default="100.100.64.0/24")
49 update_interval = models.IntegerField(
50 default=5, help_text="Interval in seconds for host configuration updates"
51 )
52 members = models.ManyToManyField(
53 User, through="NetworkMembership", related_name="networks"
54 )
56 def __str__(self):
57 return self.name
60class CA(TimestampedModel):
61 network = models.ForeignKey(Network, on_delete=models.CASCADE)
62 name = models.CharField(max_length=200)
64 key = models.TextField()
65 cert = models.TextField()
66 cert_print = models.JSONField(blank=True, null=True)
68 def __str__(self):
69 return self.name
71 @property
72 def days_until_expiry(self):
73 if (
74 not self.cert_print
75 or "details" not in self.cert_print
76 or "notAfter" not in self.cert_print["details"]
77 ):
78 return None
80 expiry_str = self.cert_print["details"]["notAfter"]
81 try:
82 expiry_date = datetime.fromisoformat(expiry_str)
83 days = (expiry_date - timezone.now()).days
84 return max(0, days)
85 except (ValueError, TypeError) as e:
86 logger.error("error parsing expiry date", ca_name=self.name, error=e)
87 return None
90class SigningCA(TimestampedModel):
91 network = models.OneToOneField(Network, on_delete=models.CASCADE)
92 ca = models.ForeignKey(CA, on_delete=models.CASCADE)
94 def __str__(self):
95 return self.ca.name
98class Group(TimestampedModel):
99 network = models.ForeignKey(Network, on_delete=models.CASCADE)
100 name = models.CharField(max_length=200)
101 description = models.TextField(blank=True)
103 class Meta:
104 constraints = [
105 UniqueConstraint(
106 fields=("network", "name"), name="unique_group_name_per_network"
107 ),
108 ]
110 def __str__(self):
111 return self.name
114class GroupConfig(TimestampedModel):
115 CONFIG_KEY_CHOICES = [
116 ("lighthouse.dns.host", "Lighthouse DNS Host"),
117 ("lighthouse.dns.port", "Lighthouse DNS Port"),
118 ("lighthouse.serve_dns", "Lighthouse Serve DNS"),
119 ("listen.host", "Listen Host"),
120 ("listen.port", "Listen Port"),
121 ("punchy.punch", "Punchy Punch"),
122 ("punchy.delay", "Punchy Delay"),
123 ("punchy.respond", "Punchy Respond"),
124 ("punchy.respond_delay", "Punchy Respond Delay"),
125 ("stats.type", "Stats Type"),
126 ("stats.prefix", "Stats Prefix"),
127 ("stats.protocol", "Stats Protocol"),
128 ("stats.host", "Stats Host"),
129 ("stats.interval", "Stats Interval"),
130 ("stats.path", "Stats Path"),
131 ("stats.namespace", "Stats Namespace"),
132 ("stats.subsystem", "Stats Subsystem"),
133 ("stats.message_metrics", "Stats Message Metrics"),
134 ("stats.lighthouse_metrics", "Stats Lighthouse Metrics"),
135 ]
137 group = models.ForeignKey(
138 Group, on_delete=models.CASCADE, related_name="config_overrides"
139 )
140 key = models.CharField(
141 max_length=255,
142 choices=CONFIG_KEY_CHOICES,
143 help_text="Configuration property to override or add.",
144 )
145 value = models.TextField(
146 help_text="The value to override the config property with."
147 )
149 class Meta:
150 constraints = [
151 UniqueConstraint(fields=("group", "key"), name="unique_group_config_key"),
152 ]
154 def __str__(self):
155 return f"{self.group.name} - {self.key}"
158class Rule(TimestampedModel):
159 class Direction(models.TextChoices):
160 INBOUND = "I", "inbound"
161 OUTBOUND = "O", "outbound"
163 security_group = models.ForeignKey(
164 Group, on_delete=models.CASCADE, related_name="rules"
165 )
167 direction = models.CharField(
168 max_length=10, choices=Direction.choices, default=Direction.INBOUND
169 )
171 class Protocol(models.TextChoices):
172 ANY = "any", "any"
173 UDP = "udp", "udp"
174 TCP = "tcp", "tcp"
175 ICMP = "icmp", "icmp"
177 proto = models.CharField(
178 max_length=4,
179 choices=Protocol.choices,
180 default=Protocol.ANY,
181 help_text="One of any, tcp, udp, or icmp",
182 )
184 port = models.CharField(
185 max_length=255,
186 help_text=(
187 "Takes 0 or any as any, a single number (e.g. 80), a range (e.g. 200-901), "
188 "or fragment to match second and further fragments of fragmented packets "
189 "(since there is no port available)."
190 ),
191 default="any",
192 )
194 group = models.ForeignKey(
195 Group,
196 on_delete=models.CASCADE,
197 help_text="Can be any or a literal group name, ie default-group",
198 blank=True,
199 null=True,
200 related_name="fw_groups",
201 )
203 groups = models.ManyToManyField(
204 Group,
205 blank=True,
206 related_name="fw_groupss",
207 help_text=(
208 "Same as group but accepts multiple values. Multiple values are AND'd together "
209 "and a certificate must contain all groups to pass."
210 ),
211 )
213 cidr = models.CharField(
214 max_length=255,
215 help_text="a CIDR, 0.0.0.0/0 is any. This restricts which Nebula IP addresses the rule allows.",
216 blank=True,
217 null=True,
218 )
220 local_cidr = models.CharField(
221 max_length=255,
222 help_text=(
223 "a local CIDR, 0.0.0.0/0 is any. This restricts which destination IP addresses, "
224 "when using unsafe_routes, the rule allows. If unset, the rule will allow access "
225 "to the specified ports on both the node itself as well as any IP addresses it routes to."
226 ),
227 blank=True,
228 null=True,
229 )
232class Host(TimestampedModel):
233 class Meta:
234 unique_together = (("network", "name"), ("network", "assigned_ip"))
236 network = models.ForeignKey(Network, on_delete=models.CASCADE)
237 name = models.CharField(max_length=200)
238 assigned_ip = models.CharField(max_length=200, blank=True, null=True)
240 is_lighthouse = models.BooleanField(default=False)
241 public_ip_or_hostname = models.CharField(max_length=200, blank=True, null=True)
243 is_relay = models.BooleanField(default=False)
244 use_relay = models.BooleanField(default=True)
246 public_key = models.TextField(max_length=1000, blank=True, null=True)
247 public_auth_kid = models.CharField(max_length=200, blank=True, null=True)
248 public_auth_key = models.TextField(max_length=1000, blank=True, null=True)
249 groups = models.ManyToManyField(Group, blank=True)
250 interface = models.CharField(max_length=200, default="nebula1")
252 last_config_refresh = models.DateTimeField(blank=True, null=True)
254 config_freeze = models.BooleanField(
255 default=False,
256 help_text="When true, host will not receive automatic config updates",
257 )
259 is_ephemeral = models.BooleanField(
260 default=False,
261 help_text="When true, this host will be removed if offline for over 10 minutes",
262 )
263 cli_version = models.CharField(
264 max_length=50, blank=True, help_text="Client CLI version"
265 )
266 upgrade_requested = models.BooleanField(default=False)
268 def __str__(self):
269 return self.name
271 @property
272 def is_config_stale(self):
273 if not self.last_config_refresh:
274 return True
276 stale_threshold = timezone.now() - timedelta(hours=24)
277 return self.last_config_refresh < stale_threshold
279 @property
280 def is_cli_version_outdated(self) -> bool | str:
281 if not self.cli_version:
282 return "Unknown"
284 latest_version = self.get_latest_cli_version
285 if not latest_version:
286 return "Unknown"
288 return self.cli_version != latest_version
290 @property
291 def get_latest_cli_version(self) -> str | None:
292 try:
293 response = httpx.get("https://pypi.org/pypi/meshadmin/json")
294 if response.status_code == 200:
295 return response.json()["info"]["version"]
296 except Exception:
297 pass
298 return None
301class HostCert(TimestampedModel):
302 host = models.ForeignKey(Host, on_delete=models.CASCADE)
303 ca = models.ForeignKey(CA, on_delete=models.CASCADE)
304 cert = models.TextField(max_length=1000)
305 hash = models.IntegerField(default=0)
308class HostConfig(TimestampedModel):
309 host = models.ForeignKey(Host, on_delete=models.CASCADE)
310 config = models.TextField()
311 sha256 = models.CharField(max_length=200, blank=True, null=True)
314class Template(TimestampedModel):
315 name = models.CharField(max_length=200)
316 network = models.ForeignKey(Network, on_delete=models.CASCADE)
317 is_lighthouse = models.BooleanField(default=False)
318 is_relay = models.BooleanField(default=False)
319 use_relay = models.BooleanField(default=True)
320 groups = models.ManyToManyField(Group, blank=True)
321 enrollment_key = models.CharField(max_length=255, default=uuid4, unique=True)
323 reusable = models.BooleanField(
324 default=True, help_text="When false, this key can not be used multiple times"
325 )
326 usage_limit = models.IntegerField(
327 null=True,
328 blank=True,
329 help_text="Maximum number of peers that can enroll with this key. Null means unlimited.",
330 )
331 expires_at = models.DateTimeField(
332 null=True,
333 blank=True,
334 help_text="When this key expires. Null means no expiration.",
335 )
336 usage_count = models.IntegerField(
337 default=0, help_text="Number of times this key has been used"
338 )
339 ephemeral_peers = models.BooleanField(
340 default=False,
341 help_text="When true, peers that are offline for over 10 minutes will be removed",
342 )
344 def __str__(self):
345 return self.name
348class ConfigRollout(TimestampedModel):
349 name = models.CharField(max_length=200)
350 status = models.CharField(
351 max_length=20,
352 choices=[
353 ("PENDING", "Pending"),
354 ("IN_PROGRESS", "In Progress"),
355 ("COMPLETED", "Completed"),
356 ("FAILED", "Failed"),
357 ],
358 default="PENDING",
359 )
360 network = models.ForeignKey(Network, on_delete=models.CASCADE)
361 target_hosts = models.ManyToManyField(Host, related_name="pending_rollouts")
362 completed_hosts = models.ManyToManyField(Host, related_name="completed_rollouts")
363 notes = models.TextField(blank=True)