Coverage for src/meshadmin/server/networks/models.py: 93%
148 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:09 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:09 +0200
1from datetime import datetime, timedelta
2from uuid import uuid4
4import structlog
5from django.contrib.auth import get_user_model
6from django.db import models
7from django.db.models import UniqueConstraint
8from django.utils import timezone
10User = get_user_model()
12logger = structlog.get_logger(__name__)
15class TimestampedModel(models.Model):
16 created_at = models.DateTimeField(auto_now_add=True)
17 updated_at = models.DateTimeField(auto_now=True)
19 class Meta:
20 abstract = True
23class NetworkMembership(TimestampedModel):
24 class Role(models.TextChoices):
25 ADMIN = "ADMIN", "Admin"
26 MEMBER = "MEMBER", "Member"
28 network = models.ForeignKey(
29 "Network", on_delete=models.CASCADE, related_name="memberships"
30 )
31 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="memberships")
32 role = models.CharField(max_length=20, choices=Role.choices, default=Role.MEMBER)
34 class Meta:
35 constraints = [
36 UniqueConstraint(
37 fields=("network", "user"), name="unique_network_membership"
38 ),
39 ]
41 def __str__(self):
42 return f"{self.user} - {self.network} {self.role}"
45class Network(TimestampedModel):
46 name = models.CharField(max_length=200, unique=True)
47 cidr = models.CharField(max_length=200, default="100.100.64.0/24")
48 update_interval = models.IntegerField(
49 default=5, help_text="Interval in seconds for host configuration updates"
50 )
51 members = models.ManyToManyField(
52 User, through="NetworkMembership", related_name="networks"
53 )
55 def __str__(self):
56 return self.name
59class CA(TimestampedModel):
60 network = models.ForeignKey(Network, on_delete=models.CASCADE)
61 name = models.CharField(max_length=200)
63 key = models.TextField()
64 cert = models.TextField()
65 cert_print = models.JSONField(blank=True, null=True)
67 def __str__(self):
68 return self.name
70 @property
71 def days_until_expiry(self):
72 if (
73 not self.cert_print
74 or "details" not in self.cert_print
75 or "notAfter" not in self.cert_print["details"]
76 ):
77 return None
79 expiry_str = self.cert_print["details"]["notAfter"]
80 try:
81 expiry_date = datetime.fromisoformat(expiry_str)
82 days = (expiry_date - timezone.now()).days
83 return max(0, days)
84 except (ValueError, TypeError) as e:
85 logger.error("error parsing expiry date", ca_name=self.name, error=e)
86 return None
89class SigningCA(TimestampedModel):
90 network = models.OneToOneField(Network, on_delete=models.CASCADE)
91 ca = models.ForeignKey(CA, on_delete=models.CASCADE)
93 def __str__(self):
94 return self.ca.name
97class Group(TimestampedModel):
98 network = models.ForeignKey(Network, on_delete=models.CASCADE)
99 name = models.CharField(max_length=200)
100 description = models.TextField(blank=True)
102 class Meta:
103 constraints = [
104 UniqueConstraint(
105 fields=("network", "name"), name="unique_group_name_per_network"
106 ),
107 ]
109 def __str__(self):
110 return self.name
113class GroupConfig(TimestampedModel):
114 CONFIG_KEY_CHOICES = [
115 ("lighthouse.dns.host", "Lighthouse DNS Host"),
116 ("lighthouse.dns.port", "Lighthouse DNS Port"),
117 ("lighthouse.serve_dns", "Lighthouse Serve DNS"),
118 ]
120 group = models.ForeignKey(
121 Group, on_delete=models.CASCADE, related_name="config_overrides"
122 )
123 key = models.CharField(
124 max_length=255,
125 choices=CONFIG_KEY_CHOICES,
126 help_text="Configuration property to override or add.",
127 )
128 value = models.TextField(
129 help_text="The value to override the config property with."
130 )
132 class Meta:
133 constraints = [
134 UniqueConstraint(fields=("group", "key"), name="unique_group_config_key"),
135 ]
137 def __str__(self):
138 return f"{self.group.name} - {self.key}"
141class Rule(TimestampedModel):
142 class Direction(models.TextChoices):
143 INBOUND = "I", "inbound"
144 OUTBOUND = "O", "outbound"
146 security_group = models.ForeignKey(
147 Group, on_delete=models.CASCADE, related_name="rules"
148 )
150 direction = models.CharField(
151 max_length=10, choices=Direction.choices, default=Direction.INBOUND
152 )
154 class Protocol(models.TextChoices):
155 ANY = "any", "any"
156 UDP = "udp", "udp"
157 TCP = "tcp", "tcp"
158 ICMP = "icmp", "icmp"
160 proto = models.CharField(
161 max_length=4,
162 choices=Protocol.choices,
163 default=Protocol.ANY,
164 help_text="One of any, tcp, udp, or icmp",
165 )
167 port = models.CharField(
168 max_length=255,
169 help_text=(
170 "Takes 0 or any as any, a single number (e.g. 80), a range (e.g. 200-901), "
171 "or fragment to match second and further fragments of fragmented packets "
172 "(since there is no port available)."
173 ),
174 default="any",
175 )
177 group = models.ForeignKey(
178 Group,
179 on_delete=models.CASCADE,
180 help_text="Can be any or a literal group name, ie default-group",
181 blank=True,
182 null=True,
183 related_name="fw_groups",
184 )
186 groups = models.ManyToManyField(
187 Group,
188 blank=True,
189 related_name="fw_groupss",
190 help_text=(
191 "Same as group but accepts multiple values. Multiple values are AND'd together "
192 "and a certificate must contain all groups to pass."
193 ),
194 )
196 cidr = models.CharField(
197 max_length=255,
198 help_text="a CIDR, 0.0.0.0/0 is any. This restricts which Nebula IP addresses the rule allows.",
199 blank=True,
200 null=True,
201 )
203 local_cidr = models.CharField(
204 max_length=255,
205 help_text=(
206 "a local CIDR, 0.0.0.0/0 is any. This restricts which destination IP addresses, "
207 "when using unsafe_routes, the rule allows. If unset, the rule will allow access "
208 "to the specified ports on both the node itself as well as any IP addresses it routes to."
209 ),
210 blank=True,
211 null=True,
212 )
215class Host(TimestampedModel):
216 class Meta:
217 unique_together = (("network", "name"), ("network", "assigned_ip"))
219 network = models.ForeignKey(Network, on_delete=models.CASCADE)
220 name = models.CharField(max_length=200)
221 assigned_ip = models.CharField(max_length=200, blank=True, null=True)
223 is_lighthouse = models.BooleanField(default=False)
224 public_ip_or_hostname = models.CharField(max_length=200, blank=True, null=True)
226 is_relay = models.BooleanField(default=False)
227 use_relay = models.BooleanField(default=True)
229 public_key = models.TextField(max_length=1000, blank=True, null=True)
230 public_auth_kid = models.CharField(max_length=200, blank=True, null=True)
231 public_auth_key = models.TextField(max_length=1000, blank=True, null=True)
232 groups = models.ManyToManyField(Group, blank=True)
233 interface = models.CharField(max_length=200, default="nebula1")
235 last_config_refresh = models.DateTimeField(blank=True, null=True)
237 config_freeze = models.BooleanField(
238 default=False,
239 help_text="When true, host will not receive automatic config updates",
240 )
242 is_ephemeral = models.BooleanField(
243 default=False,
244 help_text="When true, this host will be removed if offline for over 10 minutes",
245 )
247 def __str__(self):
248 return self.name
250 @property
251 def is_config_stale(self):
252 if not self.last_config_refresh:
253 return True
255 stale_threshold = timezone.now() - timedelta(hours=24)
256 return self.last_config_refresh < stale_threshold
259class HostCert(TimestampedModel):
260 host = models.ForeignKey(Host, on_delete=models.CASCADE)
261 ca = models.ForeignKey(CA, on_delete=models.CASCADE)
262 cert = models.TextField(max_length=1000)
263 hash = models.IntegerField(default=0)
266class HostConfig(TimestampedModel):
267 host = models.ForeignKey(Host, on_delete=models.CASCADE)
268 config = models.TextField()
269 sha256 = models.CharField(max_length=200, blank=True, null=True)
272class Template(TimestampedModel):
273 name = models.CharField(max_length=200)
274 network = models.ForeignKey(Network, on_delete=models.CASCADE)
275 is_lighthouse = models.BooleanField(default=False)
276 is_relay = models.BooleanField(default=False)
277 use_relay = models.BooleanField(default=True)
278 groups = models.ManyToManyField(Group, blank=True)
279 enrollment_key = models.CharField(max_length=255, default=uuid4, unique=True)
281 reusable = models.BooleanField(
282 default=True, help_text="When false, this key can not be used multiple times"
283 )
284 usage_limit = models.IntegerField(
285 null=True,
286 blank=True,
287 help_text="Maximum number of peers that can enroll with this key. Null means unlimited.",
288 )
289 expires_at = models.DateTimeField(
290 null=True,
291 blank=True,
292 help_text="When this key expires. Null means no expiration.",
293 )
294 usage_count = models.IntegerField(
295 default=0, help_text="Number of times this key has been used"
296 )
297 ephemeral_peers = models.BooleanField(
298 default=False,
299 help_text="When true, peers that are offline for over 10 minutes will be removed",
300 )
302 def __str__(self):
303 return self.name
306class ConfigRollout(TimestampedModel):
307 name = models.CharField(max_length=200)
308 status = models.CharField(
309 max_length=20,
310 choices=[
311 ("PENDING", "Pending"),
312 ("IN_PROGRESS", "In Progress"),
313 ("COMPLETED", "Completed"),
314 ("FAILED", "Failed"),
315 ],
316 default="PENDING",
317 )
318 network = models.ForeignKey(Network, on_delete=models.CASCADE)
319 target_hosts = models.ManyToManyField(Host, related_name="pending_rollouts")
320 completed_hosts = models.ManyToManyField(Host, related_name="completed_rollouts")
321 notes = models.TextField(blank=True)