Coverage for src/meshadmin/server/networks/forms.py: 82%

233 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-25 08:49 +0200

1import ipaddress 

2from datetime import timedelta 

3 

4import structlog 

5from django import forms 

6from django.contrib.auth import get_user_model 

7from django.core.exceptions import ValidationError 

8from django.shortcuts import get_object_or_404 

9from django.utils import timezone 

10 

11from meshadmin.server.networks.models import ( 

12 CA, 

13 Group, 

14 GroupConfig, 

15 Host, 

16 Network, 

17 NetworkMembership, 

18 Rule, 

19 Template, 

20) 

21from meshadmin.server.networks.services import ( 

22 create_network, 

23 create_network_ca, 

24 network_available_hosts_iterator, 

25) 

26 

27logger = structlog.get_logger(__name__) 

28 

29User = get_user_model() 

30 

31RECOMMENDED_RANGES = [ 

32 ("192.168.0.0/16", "Private network range, ideal for small networks"), 

33 ("172.16.0.0/12", "Private network range, good for medium networks"), 

34 ("10.0.0.0/8", "Private network range, suitable for large networks"), 

35 ("100.64.0.0/10", "Carrier-grade NAT range, recommended by Nebula docs"), 

36] 

37 

38 

39class NetworkForm(forms.ModelForm): 

40 class Meta: 

41 model = Network 

42 fields = ("name", "cidr", "update_interval") 

43 widgets = { 

44 "update_interval": forms.NumberInput(attrs={"min": 5, "max": 3600}), 

45 } 

46 

47 def __init__(self, *args, **kwargs): 

48 self.request = kwargs.pop("request", None) 

49 super().__init__(*args, **kwargs) 

50 

51 def clean_cidr(self): 

52 cidr = self.cleaned_data["cidr"] 

53 

54 try: 

55 network = ipaddress.ip_network(cidr) 

56 except ValueError as e: 

57 raise ValidationError(f"Invalid CIDR format: {str(e)}") 

58 

59 in_recommended_range = False 

60 for recommended_cidr, _ in RECOMMENDED_RANGES: 

61 if ipaddress.ip_network(cidr).subnet_of( 

62 ipaddress.ip_network(recommended_cidr) 

63 ): 

64 in_recommended_range = True 

65 break 

66 

67 if not in_recommended_range: 

68 suggestions = [] 

69 network_size = network.num_addresses 

70 for recommended_cidr, description in RECOMMENDED_RANGES: 

71 rec_network = ipaddress.ip_network(recommended_cidr) 

72 if network_size <= rec_network.num_addresses: 

73 suggestions.append(f"{recommended_cidr} - {description}") 

74 

75 raise ValidationError( 

76 f"Warning: The CIDR {cidr} is outside recommended network ranges. " 

77 "Consider using one of these ranges instead:\n" 

78 + "\n".join(f"{suggestion}" for suggestion in suggestions) 

79 ) 

80 

81 return cidr 

82 

83 def save(self, commit=True): 

84 logger.info("save network") 

85 instance = super().save(commit=False) 

86 if not instance.pk: 

87 instance = create_network( 

88 network_name=self.cleaned_data["name"], 

89 network_cidr=self.cleaned_data["cidr"], 

90 update_interval=self.cleaned_data["update_interval"], 

91 user=self.request.user, 

92 ) 

93 elif commit: 

94 instance.save() 

95 

96 self.save_m2m() 

97 return instance 

98 

99 

100class CAForm(forms.ModelForm): 

101 class Meta: 

102 model = CA 

103 fields = ("name", "network") 

104 

105 def __init__(self, *args, **kwargs): 

106 network = kwargs.pop("network", None) 

107 super().__init__(*args, **kwargs) 

108 

109 if network: 

110 self.fields.pop("network") 

111 self.instance.network = network 

112 

113 def save(self, commit=True): 

114 instance = super().save(commit=False) 

115 

116 if instance.pk: 

117 if commit: 

118 instance.save() 

119 else: 

120 instance = create_network_ca(instance.name, instance.network) 

121 

122 return instance 

123 

124 

125class GroupForm(forms.ModelForm): 

126 class Meta: 

127 model = Group 

128 fields = ("name", "description") 

129 

130 def __init__(self, *args, **kwargs): 

131 network = kwargs.pop("network", None) 

132 super().__init__(*args, **kwargs) 

133 

134 if network: 

135 self.instance.network = network 

136 

137 

138class TemplateForm(forms.ModelForm): 

139 expiry_days = forms.IntegerField( 

140 required=False, 

141 min_value=1, 

142 label="Expires in (days)", 

143 help_text="Days until the key expires. Leave empty for no expiration.", 

144 ) 

145 

146 class Meta: 

147 model = Template 

148 fields = ( 

149 "name", 

150 "network", 

151 "is_lighthouse", 

152 "is_relay", 

153 "use_relay", 

154 "groups", 

155 "reusable", 

156 "usage_limit", 

157 "ephemeral_peers", 

158 ) 

159 

160 def __init__(self, *args, **kwargs): 

161 network = kwargs.pop("network", None) 

162 super().__init__(*args, **kwargs) 

163 

164 if self.instance and self.instance.pk and self.instance.expires_at: 

165 days_remaining = (self.instance.expires_at - timezone.now()).days 

166 if days_remaining > 0: 

167 self.fields["expiry_days"].initial = days_remaining 

168 

169 network_id = None 

170 if network: 

171 network_id = network.id 

172 self.fields.pop("network") 

173 self.instance.network = network 

174 elif self.instance and self.instance.pk: 

175 network_id = self.instance.network_id 

176 elif "initial" in kwargs and "network_id" in kwargs["initial"]: 

177 network_id = kwargs["initial"]["network_id"] 

178 

179 if network_id: 

180 self.fields["groups"].queryset = Group.objects.filter( 

181 network_id=network_id 

182 ).all() 

183 

184 def save(self, commit=True): 

185 instance = super().save(commit=False) 

186 

187 expiry_days = self.cleaned_data.get("expiry_days") 

188 if expiry_days: 

189 instance.expires_at = timezone.now() + timedelta(days=expiry_days) 

190 elif "expiry_days" in self.changed_data: 

191 instance.expires_at = None 

192 

193 if commit: 

194 instance.save() 

195 self.save_m2m() 

196 

197 return instance 

198 

199 

200class HostForm(forms.ModelForm): 

201 class Meta: 

202 model = Host 

203 fields = ( 

204 "name", 

205 "network", 

206 "assigned_ip", 

207 "is_lighthouse", 

208 "is_relay", 

209 "use_relay", 

210 "groups", 

211 "public_ip_or_hostname", 

212 "public_auth_key", 

213 "interface", 

214 ) 

215 

216 def __init__(self, *args, **kwargs): 

217 network = kwargs.pop("network", None) 

218 super().__init__(*args, **kwargs) 

219 

220 network_id = None 

221 if network: 

222 network_id = network.id 

223 self.fields.pop("network") 

224 self.instance.network = network 

225 elif self.instance and self.instance.pk: 

226 network_id = self.instance.network_id 

227 

228 if network_id: 

229 self.fields["groups"].queryset = Group.objects.filter( 

230 network_id=network_id 

231 ).all() 

232 

233 if not self.instance.pk: 

234 network = Network.objects.get(id=network_id) 

235 ipv4_iterator = network_available_hosts_iterator(network) 

236 self.initial["assigned_ip"] = next(ipv4_iterator) 

237 

238 

239class RuleForm(forms.ModelForm): 

240 class Meta: 

241 model = Rule 

242 fields = ( 

243 "direction", 

244 "proto", 

245 "port", 

246 "group", 

247 "groups", 

248 "cidr", 

249 "local_cidr", 

250 ) 

251 

252 def __init__(self, *args, **kwargs): 

253 super().__init__(*args, **kwargs) 

254 security_group_id = kwargs.get("initial", {}).get("security_group_id") 

255 if not security_group_id and kwargs.get("instance"): 

256 security_group_id = kwargs["instance"].security_group_id 

257 

258 if security_group_id: 

259 security_group = get_object_or_404(Group, id=security_group_id) 

260 group_queryset = Group.objects.filter(network_id=security_group.network_id) 

261 self.fields["group"].queryset = group_queryset 

262 self.fields["groups"].queryset = group_queryset 

263 

264 def clean(self): 

265 cleaned_data = super().clean() 

266 

267 # Validate port format 

268 port = cleaned_data.get("port") 

269 if port and port not in ("0", "any", "fragment"): 

270 if "-" in port: 

271 try: 

272 start, end = map(int, port.split("-")) 

273 if not (0 <= start <= 65535 and 0 <= end <= 65535): 

274 raise ValueError 

275 except ValueError: 

276 raise ValidationError( 

277 { 

278 "port": "Port range must be two valid port numbers (0-65535) separated by a hyphen" 

279 } 

280 ) 

281 elif not port.isdigit() or not 0 <= int(port) <= 65535: 

282 raise ValidationError( 

283 { 

284 "port": "Port must be 'any', 'fragment', or a number between 0 and 65535" 

285 } 

286 ) 

287 

288 # Validate that at least one target specification exists 

289 group = cleaned_data.get("group") 

290 groups = cleaned_data.get("groups") 

291 cidr = cleaned_data.get("cidr") 

292 

293 if not any([group, groups.exists() if groups else False, cidr]): 

294 raise ValidationError( 

295 "At least one of group, groups, or CIDR must be specified to identify " 

296 "which hosts the rule applies to." 

297 ) 

298 

299 # Validate CIDR format if provided 

300 if cidr: 

301 try: 

302 ipaddress.ip_network(cidr) 

303 except ValueError: 

304 raise ValidationError( 

305 {"cidr": "Invalid CIDR format. Example: 0.0.0.0/0"} 

306 ) 

307 

308 # Validate local_cidr format if provided 

309 local_cidr = cleaned_data.get("local_cidr") 

310 if local_cidr: 

311 try: 

312 ipaddress.ip_network(local_cidr) 

313 except ValueError: 

314 raise ValidationError( 

315 {"local_cidr": "Invalid CIDR format. Example: 0.0.0.0/0"} 

316 ) 

317 

318 return cleaned_data 

319 

320 

321class NetworkMembershipForm(forms.ModelForm): 

322 email = forms.EmailField( 

323 help_text="Enter the email address of the user you want to add to the network" 

324 ) 

325 

326 class Meta: 

327 model = NetworkMembership 

328 fields = ["role"] 

329 

330 def __init__(self, *args, **kwargs): 

331 self.network = kwargs.pop("network", None) 

332 super().__init__(*args, **kwargs) 

333 if self.instance.pk: 

334 self.fields["email"].initial = self.instance.user.email 

335 self.fields["email"].disabled = True 

336 

337 def clean_email(self): 

338 email = self.cleaned_data["email"] 

339 user = User.objects.filter(email=email).first() 

340 if not user: 

341 raise ValidationError("No user found with this email address") 

342 if ( 

343 self.network 

344 and NetworkMembership.objects.filter( 

345 network=self.network, user=user 

346 ).exists() 

347 ): 

348 raise ValidationError("This user is already a member of the network") 

349 return email 

350 

351 def save(self, commit=True): 

352 instance = super().save(commit=False) 

353 if not instance.pk: 

354 email = self.cleaned_data["email"] 

355 user = User.objects.get(email=email) 

356 instance.user = user 

357 instance.network = self.network 

358 if commit: 

359 instance.save() 

360 return instance 

361 

362 

363class GroupConfigForm(forms.ModelForm): 

364 class Meta: 

365 model = GroupConfig 

366 fields = ["key", "value"] 

367 widgets = { 

368 "value": forms.Textarea(attrs={"rows": 4}), 

369 } 

370 

371 def clean_value(self): 

372 value = self.cleaned_data["value"] 

373 key = self.cleaned_data.get("key") 

374 

375 if not key: 

376 return value 

377 

378 # Validate value based on the selected key 

379 if key == "lighthouse.interval": 

380 try: 

381 interval = int(value) 

382 if interval < 1: 

383 raise forms.ValidationError("Interval must be a positive integer") 

384 return str(interval) 

385 except ValueError: 

386 raise forms.ValidationError("Interval must be a valid integer") 

387 

388 elif key == "lighthouse.dns.port": 

389 try: 

390 port = int(value) 

391 if port < 1 or port > 65535: 

392 raise forms.ValidationError("Port must be between 1 and 65535") 

393 return str(port) 

394 except ValueError: 

395 raise forms.ValidationError("Port must be a valid integer") 

396 

397 elif key == "lighthouse.serve_dns": 

398 if value.lower() not in ["true", "false"]: 

399 raise forms.ValidationError("Value must be a boolean (true/false)") 

400 return value.lower()