Coverage for src/meshadmin/server/networks/forms.py: 82%
233 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-06 11:34 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-06 11:34 +0200
1import ipaddress
2from datetime import timedelta
4import structlog
5from django import forms
6from django.contrib.auth import get_user_model
7from django.core.exceptions import ValidationError
8from django.shortcuts import get_object_or_404
9from django.utils import timezone
11from meshadmin.server.networks.models import (
12 CA,
13 Group,
14 GroupConfig,
15 Host,
16 Network,
17 NetworkMembership,
18 Rule,
19 Template,
20)
21from meshadmin.server.networks.services import (
22 create_network,
23 create_network_ca,
24 network_available_hosts_iterator,
25)
27logger = structlog.get_logger(__name__)
29User = get_user_model()
31RECOMMENDED_RANGES = [
32 ("192.168.0.0/16", "Private network range, ideal for small networks"),
33 ("172.16.0.0/12", "Private network range, good for medium networks"),
34 ("10.0.0.0/8", "Private network range, suitable for large networks"),
35 ("100.64.0.0/10", "Carrier-grade NAT range, recommended by Nebula docs"),
36]
39class NetworkForm(forms.ModelForm):
40 class Meta:
41 model = Network
42 fields = ("name", "cidr", "update_interval")
43 widgets = {
44 "update_interval": forms.NumberInput(attrs={"min": 5, "max": 3600}),
45 }
47 def __init__(self, *args, **kwargs):
48 self.request = kwargs.pop("request", None)
49 super().__init__(*args, **kwargs)
51 def clean_cidr(self):
52 cidr = self.cleaned_data["cidr"]
54 try:
55 network = ipaddress.ip_network(cidr)
56 except ValueError as e:
57 raise ValidationError(f"Invalid CIDR format: {str(e)}")
59 in_recommended_range = False
60 for recommended_cidr, _ in RECOMMENDED_RANGES:
61 if ipaddress.ip_network(cidr).subnet_of(
62 ipaddress.ip_network(recommended_cidr)
63 ):
64 in_recommended_range = True
65 break
67 if not in_recommended_range:
68 suggestions = []
69 network_size = network.num_addresses
70 for recommended_cidr, description in RECOMMENDED_RANGES:
71 rec_network = ipaddress.ip_network(recommended_cidr)
72 if network_size <= rec_network.num_addresses:
73 suggestions.append(f"{recommended_cidr} - {description}")
75 raise ValidationError(
76 f"Warning: The CIDR {cidr} is outside recommended network ranges. "
77 "Consider using one of these ranges instead:\n"
78 + "\n".join(f"• {suggestion}" for suggestion in suggestions)
79 )
81 return cidr
83 def save(self, commit=True):
84 logger.info("save network")
85 instance = super().save(commit=False)
86 if not instance.pk:
87 instance = create_network(
88 network_name=self.cleaned_data["name"],
89 network_cidr=self.cleaned_data["cidr"],
90 update_interval=self.cleaned_data["update_interval"],
91 user=self.request.user,
92 )
93 elif commit:
94 instance.save()
96 self.save_m2m()
97 return instance
100class CAForm(forms.ModelForm):
101 class Meta:
102 model = CA
103 fields = ("name", "network")
105 def __init__(self, *args, **kwargs):
106 network = kwargs.pop("network", None)
107 super().__init__(*args, **kwargs)
109 if network:
110 self.fields.pop("network")
111 self.instance.network = network
113 def save(self, commit=True):
114 instance = super().save(commit=False)
116 if instance.pk:
117 if commit:
118 instance.save()
119 else:
120 instance = create_network_ca(instance.name, instance.network)
122 return instance
125class GroupForm(forms.ModelForm):
126 class Meta:
127 model = Group
128 fields = ("name", "description")
130 def __init__(self, *args, **kwargs):
131 network = kwargs.pop("network", None)
132 super().__init__(*args, **kwargs)
134 if network:
135 self.instance.network = network
138class TemplateForm(forms.ModelForm):
139 expiry_days = forms.IntegerField(
140 required=False,
141 min_value=1,
142 label="Expires in (days)",
143 help_text="Days until the key expires. Leave empty for no expiration.",
144 )
146 class Meta:
147 model = Template
148 fields = (
149 "name",
150 "network",
151 "is_lighthouse",
152 "is_relay",
153 "use_relay",
154 "groups",
155 "reusable",
156 "usage_limit",
157 "ephemeral_peers",
158 )
160 def __init__(self, *args, **kwargs):
161 network = kwargs.pop("network", None)
162 super().__init__(*args, **kwargs)
164 if self.instance and self.instance.pk and self.instance.expires_at:
165 days_remaining = (self.instance.expires_at - timezone.now()).days
166 if days_remaining > 0:
167 self.fields["expiry_days"].initial = days_remaining
169 network_id = None
170 if network:
171 network_id = network.id
172 self.fields.pop("network")
173 self.instance.network = network
174 elif self.instance and self.instance.pk:
175 network_id = self.instance.network_id
176 elif "initial" in kwargs and "network_id" in kwargs["initial"]:
177 network_id = kwargs["initial"]["network_id"]
179 if network_id:
180 self.fields["groups"].queryset = Group.objects.filter(
181 network_id=network_id
182 ).all()
184 def save(self, commit=True):
185 instance = super().save(commit=False)
187 expiry_days = self.cleaned_data.get("expiry_days")
188 if expiry_days:
189 instance.expires_at = timezone.now() + timedelta(days=expiry_days)
190 elif "expiry_days" in self.changed_data:
191 instance.expires_at = None
193 if commit:
194 instance.save()
195 self.save_m2m()
197 return instance
200class HostForm(forms.ModelForm):
201 class Meta:
202 model = Host
203 fields = (
204 "name",
205 "network",
206 "assigned_ip",
207 "is_lighthouse",
208 "is_relay",
209 "use_relay",
210 "groups",
211 "public_ip_or_hostname",
212 "public_auth_key",
213 "interface",
214 )
216 def __init__(self, *args, **kwargs):
217 network = kwargs.pop("network", None)
218 super().__init__(*args, **kwargs)
220 network_id = None
221 if network:
222 network_id = network.id
223 self.fields.pop("network")
224 self.instance.network = network
225 elif self.instance and self.instance.pk:
226 network_id = self.instance.network_id
228 if network_id:
229 self.fields["groups"].queryset = Group.objects.filter(
230 network_id=network_id
231 ).all()
233 if not self.instance.pk:
234 network = Network.objects.get(id=network_id)
235 ipv4_iterator = network_available_hosts_iterator(network)
236 self.initial["assigned_ip"] = next(ipv4_iterator)
239class RuleForm(forms.ModelForm):
240 class Meta:
241 model = Rule
242 fields = (
243 "direction",
244 "proto",
245 "port",
246 "group",
247 "groups",
248 "cidr",
249 "local_cidr",
250 )
252 def __init__(self, *args, **kwargs):
253 super().__init__(*args, **kwargs)
254 security_group_id = kwargs.get("initial", {}).get("security_group_id")
255 if not security_group_id and kwargs.get("instance"):
256 security_group_id = kwargs["instance"].security_group_id
258 if security_group_id:
259 security_group = get_object_or_404(Group, id=security_group_id)
260 group_queryset = Group.objects.filter(network_id=security_group.network_id)
261 self.fields["group"].queryset = group_queryset
262 self.fields["groups"].queryset = group_queryset
264 def clean(self):
265 cleaned_data = super().clean()
267 # Validate port format
268 port = cleaned_data.get("port")
269 if port and port not in ("0", "any", "fragment"):
270 if "-" in port:
271 try:
272 start, end = map(int, port.split("-"))
273 if not (0 <= start <= 65535 and 0 <= end <= 65535):
274 raise ValueError
275 except ValueError:
276 raise ValidationError(
277 {
278 "port": "Port range must be two valid port numbers (0-65535) separated by a hyphen"
279 }
280 )
281 elif not port.isdigit() or not 0 <= int(port) <= 65535:
282 raise ValidationError(
283 {
284 "port": "Port must be 'any', 'fragment', or a number between 0 and 65535"
285 }
286 )
288 # Validate that at least one target specification exists
289 group = cleaned_data.get("group")
290 groups = cleaned_data.get("groups")
291 cidr = cleaned_data.get("cidr")
293 if not any([group, groups.exists() if groups else False, cidr]):
294 raise ValidationError(
295 "At least one of group, groups, or CIDR must be specified to identify "
296 "which hosts the rule applies to."
297 )
299 # Validate CIDR format if provided
300 if cidr:
301 try:
302 ipaddress.ip_network(cidr)
303 except ValueError:
304 raise ValidationError(
305 {"cidr": "Invalid CIDR format. Example: 0.0.0.0/0"}
306 )
308 # Validate local_cidr format if provided
309 local_cidr = cleaned_data.get("local_cidr")
310 if local_cidr:
311 try:
312 ipaddress.ip_network(local_cidr)
313 except ValueError:
314 raise ValidationError(
315 {"local_cidr": "Invalid CIDR format. Example: 0.0.0.0/0"}
316 )
318 return cleaned_data
321class NetworkMembershipForm(forms.ModelForm):
322 email = forms.EmailField(
323 help_text="Enter the email address of the user you want to add to the network"
324 )
326 class Meta:
327 model = NetworkMembership
328 fields = ["role"]
330 def __init__(self, *args, **kwargs):
331 self.network = kwargs.pop("network", None)
332 super().__init__(*args, **kwargs)
333 if self.instance.pk:
334 self.fields["email"].initial = self.instance.user.email
335 self.fields["email"].disabled = True
337 def clean_email(self):
338 email = self.cleaned_data["email"]
339 user = User.objects.filter(email=email).first()
340 if not user:
341 raise ValidationError("No user found with this email address")
342 if (
343 self.network
344 and NetworkMembership.objects.filter(
345 network=self.network, user=user
346 ).exists()
347 ):
348 raise ValidationError("This user is already a member of the network")
349 return email
351 def save(self, commit=True):
352 instance = super().save(commit=False)
353 if not instance.pk:
354 email = self.cleaned_data["email"]
355 user = User.objects.get(email=email)
356 instance.user = user
357 instance.network = self.network
358 if commit:
359 instance.save()
360 return instance
363class GroupConfigForm(forms.ModelForm):
364 class Meta:
365 model = GroupConfig
366 fields = ["key", "value"]
367 widgets = {
368 "value": forms.Textarea(attrs={"rows": 4}),
369 }
371 def clean_value(self):
372 value = self.cleaned_data["value"]
373 key = self.cleaned_data.get("key")
375 if not key:
376 return value
378 # Validate value based on the selected key
379 if key == "lighthouse.interval":
380 try:
381 interval = int(value)
382 if interval < 1:
383 raise forms.ValidationError("Interval must be a positive integer")
384 return str(interval)
385 except ValueError:
386 raise forms.ValidationError("Interval must be a valid integer")
388 elif key == "lighthouse.dns.port":
389 try:
390 port = int(value)
391 if port < 1 or port > 65535:
392 raise forms.ValidationError("Port must be between 1 and 65535")
393 return str(port)
394 except ValueError:
395 raise forms.ValidationError("Port must be a valid integer")
397 elif key == "lighthouse.serve_dns":
398 if value.lower() not in ["true", "false"]:
399 raise forms.ValidationError("Value must be a boolean (true/false)")
400 return value.lower()