Coverage for src/meshadmin/server/networks/models.py: 92%
169 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 15:09 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 15:09 +0200
1from datetime import datetime, timedelta
2from uuid import uuid4
4import httpx
5import structlog
6from django.contrib.auth import get_user_model
7from django.db import models
8from django.db.models import UniqueConstraint
9from django.utils import timezone
11User = get_user_model()
13logger = structlog.get_logger(__name__)
16class TimestampedModel(models.Model):
17 created_at = models.DateTimeField(auto_now_add=True)
18 updated_at = models.DateTimeField(auto_now=True)
20 class Meta:
21 abstract = True
24class NetworkMembership(TimestampedModel):
25 class Role(models.TextChoices):
26 ADMIN = "ADMIN", "Admin"
27 MEMBER = "MEMBER", "Member"
29 network = models.ForeignKey(
30 "Network", on_delete=models.CASCADE, related_name="memberships"
31 )
32 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="memberships")
33 role = models.CharField(max_length=20, choices=Role.choices, default=Role.MEMBER)
35 class Meta:
36 constraints = [
37 UniqueConstraint(
38 fields=("network", "user"), name="unique_network_membership"
39 ),
40 ]
42 def __str__(self):
43 return f"{self.user} - {self.network} {self.role}"
46class Network(TimestampedModel):
47 name = models.CharField(max_length=200, unique=True)
48 cidr = models.CharField(max_length=200, default="100.100.64.0/24")
49 update_interval = models.IntegerField(
50 default=5, help_text="Interval in seconds for host configuration updates"
51 )
52 members = models.ManyToManyField(
53 User, through="NetworkMembership", related_name="networks"
54 )
56 def __str__(self):
57 return self.name
60class CA(TimestampedModel):
61 network = models.ForeignKey(Network, on_delete=models.CASCADE)
62 name = models.CharField(max_length=200)
64 key = models.TextField()
65 cert = models.TextField()
66 cert_print = models.JSONField(blank=True, null=True)
68 def __str__(self):
69 return self.name
71 @property
72 def days_until_expiry(self):
73 if (
74 not self.cert_print
75 or "details" not in self.cert_print
76 or "notAfter" not in self.cert_print["details"]
77 ):
78 return None
80 expiry_str = self.cert_print["details"]["notAfter"]
81 try:
82 expiry_date = datetime.fromisoformat(expiry_str)
83 days = (expiry_date - timezone.now()).days
84 return max(0, days)
85 except (ValueError, TypeError) as e:
86 logger.error("error parsing expiry date", ca_name=self.name, error=e)
87 return None
90class SigningCA(TimestampedModel):
91 network = models.OneToOneField(Network, on_delete=models.CASCADE)
92 ca = models.ForeignKey(CA, on_delete=models.CASCADE)
94 def __str__(self):
95 return self.ca.name
98class Group(TimestampedModel):
99 network = models.ForeignKey(Network, on_delete=models.CASCADE)
100 name = models.CharField(max_length=200)
101 description = models.TextField(blank=True)
103 class Meta:
104 constraints = [
105 UniqueConstraint(
106 fields=("network", "name"), name="unique_group_name_per_network"
107 ),
108 ]
110 def __str__(self):
111 return self.name
114class GroupConfig(TimestampedModel):
115 CONFIG_KEY_CHOICES = [
116 ("lighthouse.dns.host", "Lighthouse DNS Host"),
117 ("lighthouse.dns.port", "Lighthouse DNS Port"),
118 ("lighthouse.serve_dns", "Lighthouse Serve DNS"),
119 ("listen.host", "Listen Host"),
120 ("listen.port", "Listen Port"),
121 ("punchy.punch", "Punchy Punch"),
122 ("punchy.delay", "Punchy Delay"),
123 ("punchy.respond", "Punchy Respond"),
124 ("punchy.respond_delay", "Punchy Respond Delay"),
125 ("stats.type", "Stats Type"),
126 ("stats.prefix", "Stats Prefix"),
127 ("stats.protocol", "Stats Protocol"),
128 ("stats.host", "Stats Host"),
129 ("stats.interval", "Stats Interval"),
130 ("stats.path", "Stats Path"),
131 ("stats.namespace", "Stats Namespace"),
132 ("stats.subsystem", "Stats Subsystem"),
133 ("stats.message_metrics", "Stats Message Metrics"),
134 ("stats.lighthouse_metrics", "Stats Lighthouse Metrics"),
135 ]
137 group = models.ForeignKey(
138 Group, on_delete=models.CASCADE, related_name="config_overrides"
139 )
140 key = models.CharField(
141 max_length=255,
142 choices=CONFIG_KEY_CHOICES,
143 help_text="Configuration property to override or add.",
144 )
145 value = models.TextField(
146 help_text="The value to override the config property with."
147 )
149 class Meta:
150 constraints = [
151 UniqueConstraint(fields=("group", "key"), name="unique_group_config_key"),
152 ]
154 def __str__(self):
155 return f"{self.group.name} - {self.key}"
158class Rule(TimestampedModel):
159 class Direction(models.TextChoices):
160 INBOUND = "I", "inbound"
161 OUTBOUND = "O", "outbound"
163 security_group = models.ForeignKey(
164 Group, on_delete=models.CASCADE, related_name="rules"
165 )
167 direction = models.CharField(
168 max_length=10, choices=Direction.choices, default=Direction.INBOUND
169 )
171 class Protocol(models.TextChoices):
172 ANY = "any", "any"
173 UDP = "udp", "udp"
174 TCP = "tcp", "tcp"
175 ICMP = "icmp", "icmp"
177 proto = models.CharField(
178 max_length=4,
179 choices=Protocol.choices,
180 default=Protocol.ANY,
181 help_text="One of any, tcp, udp, or icmp",
182 )
184 port = models.CharField(
185 max_length=255,
186 help_text=(
187 "Takes 0 or any as any, a single number (e.g. 80), a range (e.g. 200-901), "
188 "or fragment to match second and further fragments of fragmented packets "
189 "(since there is no port available)."
190 ),
191 default="any",
192 )
194 group = models.ForeignKey(
195 Group,
196 on_delete=models.CASCADE,
197 help_text="Can be any or a literal group name, ie default-group",
198 blank=True,
199 null=True,
200 related_name="fw_groups",
201 )
203 groups = models.ManyToManyField(
204 Group,
205 blank=True,
206 related_name="fw_groupss",
207 help_text=(
208 "Same as group but accepts multiple values. Multiple values are AND'd together "
209 "and a certificate must contain all groups to pass."
210 ),
211 )
213 cidr = models.CharField(
214 max_length=255,
215 help_text="a CIDR, 0.0.0.0/0 is any. This restricts which Nebula IP addresses the rule allows.",
216 blank=True,
217 null=True,
218 )
220 local_cidr = models.CharField(
221 max_length=255,
222 help_text=(
223 "a local CIDR, 0.0.0.0/0 is any. This restricts which destination IP addresses, "
224 "when using unsafe_routes, the rule allows. If unset, the rule will allow access "
225 "to the specified ports on both the node itself as well as any IP addresses it routes to."
226 ),
227 blank=True,
228 null=True,
229 )
231 description = models.CharField(
232 max_length=255,
233 blank=True,
234 help_text="Optional description for this rule",
235 )
238class Host(TimestampedModel):
239 class Meta:
240 unique_together = (("network", "name"), ("network", "assigned_ip"))
242 network = models.ForeignKey(Network, on_delete=models.CASCADE)
243 name = models.CharField(max_length=200)
244 assigned_ip = models.CharField(max_length=200, blank=True, null=True)
246 is_lighthouse = models.BooleanField(default=False)
247 public_ip_or_hostname = models.CharField(max_length=200, blank=True, null=True)
249 is_relay = models.BooleanField(default=False)
250 use_relay = models.BooleanField(default=True)
252 public_key = models.TextField(max_length=1000, blank=True, null=True)
253 public_auth_kid = models.CharField(max_length=200, blank=True, null=True)
254 public_auth_key = models.TextField(max_length=1000, blank=True, null=True)
255 groups = models.ManyToManyField(Group, blank=True)
256 interface = models.CharField(max_length=200, default="nebula1")
258 last_config_refresh = models.DateTimeField(blank=True, null=True)
260 config_freeze = models.BooleanField(
261 default=False,
262 help_text="When true, host will not receive automatic config updates",
263 )
265 is_ephemeral = models.BooleanField(
266 default=False,
267 help_text="When true, this host will be removed if offline for over 10 minutes",
268 )
269 cli_version = models.CharField(
270 max_length=50, blank=True, help_text="Client CLI version"
271 )
272 upgrade_requested = models.BooleanField(default=False)
274 def __str__(self):
275 return self.name
277 @property
278 def is_config_stale(self):
279 if not self.last_config_refresh:
280 return True
282 stale_threshold = timezone.now() - timedelta(hours=24)
283 return self.last_config_refresh < stale_threshold
285 @property
286 def is_cli_version_outdated(self) -> bool | str:
287 if not self.cli_version:
288 return "Unknown"
290 latest_version = self.get_latest_cli_version
291 if not latest_version:
292 return "Unknown"
294 return self.cli_version != latest_version
296 @property
297 def get_latest_cli_version(self) -> str | None:
298 try:
299 response = httpx.get("https://pypi.org/pypi/meshadmin/json")
300 if response.status_code == 200:
301 return response.json()["info"]["version"]
302 except Exception:
303 pass
304 return None
307class HostCert(TimestampedModel):
308 host = models.ForeignKey(Host, on_delete=models.CASCADE)
309 ca = models.ForeignKey(CA, on_delete=models.CASCADE)
310 cert = models.TextField(max_length=1000)
311 hash = models.IntegerField(default=0)
314class HostConfig(TimestampedModel):
315 host = models.ForeignKey(Host, on_delete=models.CASCADE)
316 config = models.TextField()
317 sha256 = models.CharField(max_length=200, blank=True, null=True)
320class Template(TimestampedModel):
321 name = models.CharField(max_length=200)
322 network = models.ForeignKey(Network, on_delete=models.CASCADE)
323 is_lighthouse = models.BooleanField(default=False)
324 is_relay = models.BooleanField(default=False)
325 use_relay = models.BooleanField(default=True)
326 groups = models.ManyToManyField(Group, blank=True)
327 enrollment_key = models.CharField(max_length=255, default=uuid4, unique=True)
329 reusable = models.BooleanField(
330 default=True, help_text="When false, this key can not be used multiple times"
331 )
332 usage_limit = models.IntegerField(
333 null=True,
334 blank=True,
335 help_text="Maximum number of peers that can enroll with this key. Null means unlimited.",
336 )
337 expires_at = models.DateTimeField(
338 null=True,
339 blank=True,
340 help_text="When this key expires. Null means no expiration.",
341 )
342 usage_count = models.IntegerField(
343 default=0, help_text="Number of times this key has been used"
344 )
345 ephemeral_peers = models.BooleanField(
346 default=False,
347 help_text="When true, peers that are offline for over 10 minutes will be removed",
348 )
350 def __str__(self):
351 return self.name
354class ConfigRollout(TimestampedModel):
355 name = models.CharField(max_length=200)
356 status = models.CharField(
357 max_length=20,
358 choices=[
359 ("PENDING", "Pending"),
360 ("IN_PROGRESS", "In Progress"),
361 ("COMPLETED", "Completed"),
362 ("FAILED", "Failed"),
363 ],
364 default="PENDING",
365 )
366 network = models.ForeignKey(Network, on_delete=models.CASCADE)
367 target_hosts = models.ManyToManyField(Host, related_name="pending_rollouts")
368 completed_hosts = models.ManyToManyField(Host, related_name="completed_rollouts")
369 notes = models.TextField(blank=True)