Coverage for src/meshadmin/server/networks/forms.py: 88%
247 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 15:09 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 15:09 +0200
1import ipaddress
2from datetime import timedelta
4import structlog
5from django import forms
6from django.contrib.auth import get_user_model
7from django.core.exceptions import ValidationError
8from django.shortcuts import get_object_or_404
9from django.utils import timezone
11from meshadmin.server.networks.models import (
12 CA,
13 Group,
14 GroupConfig,
15 Host,
16 Network,
17 NetworkMembership,
18 Rule,
19 Template,
20)
21from meshadmin.server.networks.services import (
22 create_network,
23 create_network_ca,
24 network_available_hosts_iterator,
25)
27logger = structlog.get_logger(__name__)
29User = get_user_model()
31RECOMMENDED_RANGES = [
32 ("192.168.0.0/16", "Private network range, ideal for small networks"),
33 ("172.16.0.0/12", "Private network range, good for medium networks"),
34 ("10.0.0.0/8", "Private network range, suitable for large networks"),
35 ("100.64.0.0/10", "Carrier-grade NAT range, recommended by Nebula docs"),
36]
39class NetworkForm(forms.ModelForm):
40 class Meta:
41 model = Network
42 fields = ("name", "cidr", "update_interval")
43 widgets = {
44 "update_interval": forms.NumberInput(attrs={"min": 5, "max": 3600}),
45 }
47 def __init__(self, *args, **kwargs):
48 self.request = kwargs.pop("request", None)
49 super().__init__(*args, **kwargs)
51 def clean_cidr(self):
52 cidr = self.cleaned_data["cidr"]
54 try:
55 network = ipaddress.ip_network(cidr)
56 except ValueError as e:
57 raise ValidationError(f"Invalid CIDR format: {str(e)}")
59 in_recommended_range = False
60 for recommended_cidr, _ in RECOMMENDED_RANGES:
61 if ipaddress.ip_network(cidr).subnet_of(
62 ipaddress.ip_network(recommended_cidr)
63 ):
64 in_recommended_range = True
65 break
67 if not in_recommended_range:
68 suggestions = []
69 network_size = network.num_addresses
70 for recommended_cidr, description in RECOMMENDED_RANGES:
71 rec_network = ipaddress.ip_network(recommended_cidr)
72 if network_size <= rec_network.num_addresses:
73 suggestions.append(f"{recommended_cidr} - {description}")
75 raise ValidationError(
76 f"Warning: The CIDR {cidr} is outside recommended network ranges. "
77 "Consider using one of these ranges instead:\n"
78 + "\n".join(f"• {suggestion}" for suggestion in suggestions)
79 )
81 return cidr
83 def save(self, commit=True):
84 logger.info("save network")
85 instance = super().save(commit=False)
86 if not instance.pk:
87 instance = create_network(
88 network_name=self.cleaned_data["name"],
89 network_cidr=self.cleaned_data["cidr"],
90 update_interval=self.cleaned_data["update_interval"],
91 user=self.request.user,
92 )
93 elif commit:
94 instance.save()
96 self.save_m2m()
97 return instance
100class CAForm(forms.ModelForm):
101 class Meta:
102 model = CA
103 fields = ("name", "network")
105 def __init__(self, *args, **kwargs):
106 network = kwargs.pop("network", None)
107 super().__init__(*args, **kwargs)
109 if network:
110 self.fields.pop("network")
111 self.instance.network = network
113 def save(self, commit=True):
114 instance = super().save(commit=False)
116 if instance.pk:
117 if commit:
118 instance.save()
119 else:
120 instance = create_network_ca(instance.name, instance.network)
122 return instance
125class GroupForm(forms.ModelForm):
126 class Meta:
127 model = Group
128 fields = ("name", "description")
130 def __init__(self, *args, **kwargs):
131 network = kwargs.pop("network", None)
132 super().__init__(*args, **kwargs)
134 if network:
135 self.instance.network = network
138class TemplateForm(forms.ModelForm):
139 expiry_days = forms.IntegerField(
140 required=False,
141 min_value=1,
142 label="Expires in (days)",
143 help_text="Days until the key expires. Leave empty for no expiration.",
144 )
146 class Meta:
147 model = Template
148 fields = (
149 "name",
150 "network",
151 "is_lighthouse",
152 "is_relay",
153 "use_relay",
154 "groups",
155 "reusable",
156 "usage_limit",
157 "ephemeral_peers",
158 )
160 def __init__(self, *args, **kwargs):
161 network = kwargs.pop("network", None)
162 super().__init__(*args, **kwargs)
164 if self.instance and self.instance.pk and self.instance.expires_at:
165 days_remaining = (self.instance.expires_at - timezone.now()).days
166 if days_remaining > 0:
167 self.fields["expiry_days"].initial = days_remaining
169 network_id = None
170 if network:
171 network_id = network.id
172 self.fields.pop("network")
173 self.instance.network = network
174 elif self.instance and self.instance.pk:
175 network_id = self.instance.network_id
176 elif "initial" in kwargs and "network_id" in kwargs["initial"]:
177 network_id = kwargs["initial"]["network_id"]
179 if network_id:
180 self.fields["groups"].queryset = Group.objects.filter(
181 network_id=network_id
182 ).all()
184 def save(self, commit=True):
185 instance = super().save(commit=False)
187 expiry_days = self.cleaned_data.get("expiry_days")
188 if expiry_days:
189 instance.expires_at = timezone.now() + timedelta(days=expiry_days)
190 elif "expiry_days" in self.changed_data:
191 instance.expires_at = None
193 if commit:
194 instance.save()
195 self.save_m2m()
197 return instance
200class HostForm(forms.ModelForm):
201 class Meta:
202 model = Host
203 fields = (
204 "name",
205 "network",
206 "assigned_ip",
207 "is_lighthouse",
208 "is_relay",
209 "use_relay",
210 "groups",
211 "public_ip_or_hostname",
212 "public_auth_key",
213 "interface",
214 )
216 def __init__(self, *args, **kwargs):
217 network = kwargs.pop("network", None)
218 super().__init__(*args, **kwargs)
220 network_id = None
221 if network:
222 network_id = network.id
223 self.fields.pop("network")
224 self.instance.network = network
225 elif self.instance and self.instance.pk:
226 network_id = self.instance.network_id
228 if network_id:
229 self.fields["groups"].queryset = Group.objects.filter(
230 network_id=network_id
231 ).all()
233 if not self.instance.pk:
234 network = Network.objects.get(id=network_id)
235 ipv4_iterator = network_available_hosts_iterator(network)
236 self.initial["assigned_ip"] = next(ipv4_iterator)
239class RuleForm(forms.ModelForm):
240 class Meta:
241 model = Rule
242 fields = (
243 "description",
244 "direction",
245 "proto",
246 "port",
247 "group",
248 "groups",
249 "cidr",
250 "local_cidr",
251 )
253 def __init__(self, *args, **kwargs):
254 super().__init__(*args, **kwargs)
255 security_group_id = kwargs.get("initial", {}).get("security_group_id")
256 if not security_group_id and kwargs.get("instance"):
257 security_group_id = kwargs["instance"].security_group_id
259 if security_group_id:
260 security_group = get_object_or_404(Group, id=security_group_id)
261 group_queryset = Group.objects.filter(network_id=security_group.network_id)
262 self.fields["group"].queryset = group_queryset
263 self.fields["groups"].queryset = group_queryset
265 def clean(self):
266 cleaned_data = super().clean()
268 # Validate port format
269 port = cleaned_data.get("port")
270 if port and port not in ("0", "any", "fragment"):
271 if "-" in port:
272 try:
273 start, end = map(int, port.split("-"))
274 if not (0 <= start <= 65535 and 0 <= end <= 65535):
275 raise ValueError
276 except ValueError:
277 raise ValidationError(
278 {
279 "port": "Port range must be two valid port numbers (0-65535) separated by a hyphen"
280 }
281 )
282 elif not port.isdigit() or not 0 <= int(port) <= 65535:
283 raise ValidationError(
284 {
285 "port": "Port must be 'any', 'fragment', or a number between 0 and 65535"
286 }
287 )
289 # Validate that at least one target specification exists
290 group = cleaned_data.get("group")
291 groups = cleaned_data.get("groups")
292 cidr = cleaned_data.get("cidr")
294 if not any([group, groups.exists() if groups else False, cidr]):
295 raise ValidationError(
296 "At least one of group, groups, or CIDR must be specified to identify "
297 "which hosts the rule applies to."
298 )
300 # Validate CIDR format if provided
301 if cidr:
302 try:
303 ipaddress.ip_network(cidr)
304 except ValueError:
305 raise ValidationError(
306 {"cidr": "Invalid CIDR format. Example: 0.0.0.0/0"}
307 )
309 # Validate local_cidr format if provided
310 local_cidr = cleaned_data.get("local_cidr")
311 if local_cidr:
312 try:
313 ipaddress.ip_network(local_cidr)
314 except ValueError:
315 raise ValidationError(
316 {"local_cidr": "Invalid CIDR format. Example: 0.0.0.0/0"}
317 )
319 return cleaned_data
322class NetworkMembershipForm(forms.ModelForm):
323 email = forms.EmailField(
324 help_text="Enter the email address of the user you want to add to the network"
325 )
327 class Meta:
328 model = NetworkMembership
329 fields = ["role"]
331 def __init__(self, *args, **kwargs):
332 self.network = kwargs.pop("network", None)
333 super().__init__(*args, **kwargs)
334 if self.instance.pk:
335 self.fields["email"].initial = self.instance.user.email
336 self.fields["email"].disabled = True
338 def clean_email(self):
339 email = self.cleaned_data["email"]
340 user = User.objects.filter(email=email).first()
341 if not user:
342 raise ValidationError("No user found with this email address")
343 if (
344 self.network
345 and NetworkMembership.objects.filter(
346 network=self.network, user=user
347 ).exists()
348 ):
349 raise ValidationError("This user is already a member of the network")
350 return email
352 def save(self, commit=True):
353 instance = super().save(commit=False)
354 if not instance.pk:
355 email = self.cleaned_data["email"]
356 user = User.objects.get(email=email)
357 instance.user = user
358 instance.network = self.network
359 if commit:
360 instance.save()
361 return instance
364class GroupConfigForm(forms.ModelForm):
365 class Meta:
366 model = GroupConfig
367 fields = ["key", "value"]
368 widgets = {
369 "value": forms.Textarea(attrs={"rows": 4}),
370 }
372 def clean_value(self):
373 value = self.cleaned_data["value"]
374 key = self.cleaned_data.get("key")
376 if not key:
377 return value
379 # Validation based on https://docs.defined.net/api/tag-create/
380 port_fields = [
381 "lighthouse.dns.port",
382 "listen.port",
383 ]
384 boolean_fields = [
385 "lighthouse.serve_dns",
386 "punchy.punch",
387 "punchy.respond",
388 "stats.message_metrics",
389 "stats.lighthouse_metrics",
390 ]
391 interval_fields = [
392 "punchy.delay",
393 "punchy.respond_delay",
394 "stats.interval",
395 ]
397 # Validate ports
398 if key in port_fields:
399 try:
400 port = int(value)
401 if port < 0 or port > 65535:
402 raise forms.ValidationError("Port must be between 0 and 65535")
403 return str(port)
404 except ValueError:
405 raise forms.ValidationError("Port must be a valid integer")
407 # Validate booleans
408 elif key in boolean_fields:
409 if value.lower() not in ["true", "false"]:
410 raise forms.ValidationError("Value must be a boolean (true/false)")
411 return value.lower()
413 # Validate intervals
414 elif key in interval_fields:
415 try:
416 interval = int(value)
417 if interval < 0:
418 raise forms.ValidationError(
419 "Interval must be a non-negative integer"
420 )
421 return str(interval)
422 except ValueError:
423 raise forms.ValidationError("Interval must be a valid integer")
425 # Validate stats type
426 elif key == "stats.type":
427 valid_types = ["graphite", "prometheus"]
428 if value.lower() not in valid_types:
429 raise forms.ValidationError(
430 f"Stats type must be one of: {', '.join(valid_types)}"
431 )
432 return value.lower()
434 # Validate stats protocol
435 elif key == "stats.protocol":
436 valid_protocols = ["tcp", "udp"]
437 if value.lower() not in valid_protocols:
438 raise forms.ValidationError(
439 f"Protocol must be one of: {', '.join(valid_protocols)}"
440 )
441 return value.lower()
443 return value