Coverage for src/meshadmin/server/networks/models.py: 92%
167 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:26 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:26 +0200
1from datetime import datetime, timedelta
2from uuid import uuid4
4import httpx
5import structlog
6from django.contrib.auth import get_user_model
7from django.db import models
8from django.db.models import UniqueConstraint
9from django.utils import timezone
11User = get_user_model()
13logger = structlog.get_logger(__name__)
16class TimestampedModel(models.Model):
17 created_at = models.DateTimeField(auto_now_add=True)
18 updated_at = models.DateTimeField(auto_now=True)
20 class Meta:
21 abstract = True
24class NetworkMembership(TimestampedModel):
25 class Role(models.TextChoices):
26 ADMIN = "ADMIN", "Admin"
27 MEMBER = "MEMBER", "Member"
29 network = models.ForeignKey(
30 "Network", on_delete=models.CASCADE, related_name="memberships"
31 )
32 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="memberships")
33 role = models.CharField(max_length=20, choices=Role.choices, default=Role.MEMBER)
35 class Meta:
36 constraints = [
37 UniqueConstraint(
38 fields=("network", "user"), name="unique_network_membership"
39 ),
40 ]
42 def __str__(self):
43 return f"{self.user} - {self.network} {self.role}"
46class Network(TimestampedModel):
47 name = models.CharField(max_length=200, unique=True)
48 cidr = models.CharField(max_length=200, default="100.100.64.0/24")
49 update_interval = models.IntegerField(
50 default=5, help_text="Interval in seconds for host configuration updates"
51 )
52 members = models.ManyToManyField(
53 User, through="NetworkMembership", related_name="networks"
54 )
56 def __str__(self):
57 return self.name
60class CA(TimestampedModel):
61 network = models.ForeignKey(Network, on_delete=models.CASCADE)
62 name = models.CharField(max_length=200)
64 key = models.TextField()
65 cert = models.TextField()
66 cert_print = models.JSONField(blank=True, null=True)
68 def __str__(self):
69 return self.name
71 @property
72 def days_until_expiry(self):
73 if (
74 not self.cert_print
75 or "details" not in self.cert_print
76 or "notAfter" not in self.cert_print["details"]
77 ):
78 return None
80 expiry_str = self.cert_print["details"]["notAfter"]
81 try:
82 expiry_date = datetime.fromisoformat(expiry_str)
83 days = (expiry_date - timezone.now()).days
84 return max(0, days)
85 except (ValueError, TypeError) as e:
86 logger.error("error parsing expiry date", ca_name=self.name, error=e)
87 return None
90class SigningCA(TimestampedModel):
91 network = models.OneToOneField(Network, on_delete=models.CASCADE)
92 ca = models.ForeignKey(CA, on_delete=models.CASCADE)
94 def __str__(self):
95 return self.ca.name
98class Group(TimestampedModel):
99 network = models.ForeignKey(Network, on_delete=models.CASCADE)
100 name = models.CharField(max_length=200)
101 description = models.TextField(blank=True)
103 class Meta:
104 constraints = [
105 UniqueConstraint(
106 fields=("network", "name"), name="unique_group_name_per_network"
107 ),
108 ]
110 def __str__(self):
111 return self.name
114class GroupConfig(TimestampedModel):
115 CONFIG_KEY_CHOICES = [
116 ("lighthouse.dns.host", "Lighthouse DNS Host"),
117 ("lighthouse.dns.port", "Lighthouse DNS Port"),
118 ("lighthouse.serve_dns", "Lighthouse Serve DNS"),
119 ]
121 group = models.ForeignKey(
122 Group, on_delete=models.CASCADE, related_name="config_overrides"
123 )
124 key = models.CharField(
125 max_length=255,
126 choices=CONFIG_KEY_CHOICES,
127 help_text="Configuration property to override or add.",
128 )
129 value = models.TextField(
130 help_text="The value to override the config property with."
131 )
133 class Meta:
134 constraints = [
135 UniqueConstraint(fields=("group", "key"), name="unique_group_config_key"),
136 ]
138 def __str__(self):
139 return f"{self.group.name} - {self.key}"
142class Rule(TimestampedModel):
143 class Direction(models.TextChoices):
144 INBOUND = "I", "inbound"
145 OUTBOUND = "O", "outbound"
147 security_group = models.ForeignKey(
148 Group, on_delete=models.CASCADE, related_name="rules"
149 )
151 direction = models.CharField(
152 max_length=10, choices=Direction.choices, default=Direction.INBOUND
153 )
155 class Protocol(models.TextChoices):
156 ANY = "any", "any"
157 UDP = "udp", "udp"
158 TCP = "tcp", "tcp"
159 ICMP = "icmp", "icmp"
161 proto = models.CharField(
162 max_length=4,
163 choices=Protocol.choices,
164 default=Protocol.ANY,
165 help_text="One of any, tcp, udp, or icmp",
166 )
168 port = models.CharField(
169 max_length=255,
170 help_text=(
171 "Takes 0 or any as any, a single number (e.g. 80), a range (e.g. 200-901), "
172 "or fragment to match second and further fragments of fragmented packets "
173 "(since there is no port available)."
174 ),
175 default="any",
176 )
178 group = models.ForeignKey(
179 Group,
180 on_delete=models.CASCADE,
181 help_text="Can be any or a literal group name, ie default-group",
182 blank=True,
183 null=True,
184 related_name="fw_groups",
185 )
187 groups = models.ManyToManyField(
188 Group,
189 blank=True,
190 related_name="fw_groupss",
191 help_text=(
192 "Same as group but accepts multiple values. Multiple values are AND'd together "
193 "and a certificate must contain all groups to pass."
194 ),
195 )
197 cidr = models.CharField(
198 max_length=255,
199 help_text="a CIDR, 0.0.0.0/0 is any. This restricts which Nebula IP addresses the rule allows.",
200 blank=True,
201 null=True,
202 )
204 local_cidr = models.CharField(
205 max_length=255,
206 help_text=(
207 "a local CIDR, 0.0.0.0/0 is any. This restricts which destination IP addresses, "
208 "when using unsafe_routes, the rule allows. If unset, the rule will allow access "
209 "to the specified ports on both the node itself as well as any IP addresses it routes to."
210 ),
211 blank=True,
212 null=True,
213 )
216class Host(TimestampedModel):
217 class Meta:
218 unique_together = (("network", "name"), ("network", "assigned_ip"))
220 network = models.ForeignKey(Network, on_delete=models.CASCADE)
221 name = models.CharField(max_length=200)
222 assigned_ip = models.CharField(max_length=200, blank=True, null=True)
224 is_lighthouse = models.BooleanField(default=False)
225 public_ip_or_hostname = models.CharField(max_length=200, blank=True, null=True)
227 is_relay = models.BooleanField(default=False)
228 use_relay = models.BooleanField(default=True)
230 public_key = models.TextField(max_length=1000, blank=True, null=True)
231 public_auth_kid = models.CharField(max_length=200, blank=True, null=True)
232 public_auth_key = models.TextField(max_length=1000, blank=True, null=True)
233 groups = models.ManyToManyField(Group, blank=True)
234 interface = models.CharField(max_length=200, default="nebula1")
236 last_config_refresh = models.DateTimeField(blank=True, null=True)
238 config_freeze = models.BooleanField(
239 default=False,
240 help_text="When true, host will not receive automatic config updates",
241 )
243 is_ephemeral = models.BooleanField(
244 default=False,
245 help_text="When true, this host will be removed if offline for over 10 minutes",
246 )
247 cli_version = models.CharField(
248 max_length=50, blank=True, help_text="Client CLI version"
249 )
251 def __str__(self):
252 return self.name
254 @property
255 def is_config_stale(self):
256 if not self.last_config_refresh:
257 return True
259 stale_threshold = timezone.now() - timedelta(hours=24)
260 return self.last_config_refresh < stale_threshold
262 @property
263 def is_cli_version_outdated(self) -> bool | str:
264 if not self.cli_version:
265 return "Unknown"
267 latest_version = self.get_latest_cli_version
268 if not latest_version:
269 return "Unknown"
271 return self.cli_version != latest_version
273 @property
274 def get_latest_cli_version(self) -> str | None:
275 try:
276 response = httpx.get("https://pypi.org/pypi/meshadmin/json")
277 if response.status_code == 200:
278 return response.json()["info"]["version"]
279 except Exception:
280 pass
281 return None
284class HostCert(TimestampedModel):
285 host = models.ForeignKey(Host, on_delete=models.CASCADE)
286 ca = models.ForeignKey(CA, on_delete=models.CASCADE)
287 cert = models.TextField(max_length=1000)
288 hash = models.IntegerField(default=0)
291class HostConfig(TimestampedModel):
292 host = models.ForeignKey(Host, on_delete=models.CASCADE)
293 config = models.TextField()
294 sha256 = models.CharField(max_length=200, blank=True, null=True)
297class Template(TimestampedModel):
298 name = models.CharField(max_length=200)
299 network = models.ForeignKey(Network, on_delete=models.CASCADE)
300 is_lighthouse = models.BooleanField(default=False)
301 is_relay = models.BooleanField(default=False)
302 use_relay = models.BooleanField(default=True)
303 groups = models.ManyToManyField(Group, blank=True)
304 enrollment_key = models.CharField(max_length=255, default=uuid4, unique=True)
306 reusable = models.BooleanField(
307 default=True, help_text="When false, this key can not be used multiple times"
308 )
309 usage_limit = models.IntegerField(
310 null=True,
311 blank=True,
312 help_text="Maximum number of peers that can enroll with this key. Null means unlimited.",
313 )
314 expires_at = models.DateTimeField(
315 null=True,
316 blank=True,
317 help_text="When this key expires. Null means no expiration.",
318 )
319 usage_count = models.IntegerField(
320 default=0, help_text="Number of times this key has been used"
321 )
322 ephemeral_peers = models.BooleanField(
323 default=False,
324 help_text="When true, peers that are offline for over 10 minutes will be removed",
325 )
327 def __str__(self):
328 return self.name
331class ConfigRollout(TimestampedModel):
332 name = models.CharField(max_length=200)
333 status = models.CharField(
334 max_length=20,
335 choices=[
336 ("PENDING", "Pending"),
337 ("IN_PROGRESS", "In Progress"),
338 ("COMPLETED", "Completed"),
339 ("FAILED", "Failed"),
340 ],
341 default="PENDING",
342 )
343 network = models.ForeignKey(Network, on_delete=models.CASCADE)
344 target_hosts = models.ManyToManyField(Host, related_name="pending_rollouts")
345 completed_hosts = models.ManyToManyField(Host, related_name="completed_rollouts")
346 notes = models.TextField(blank=True)