Coverage for src/meshadmin/server/networks/forms.py: 88%

247 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-09 15:09 +0200

1import ipaddress 

2from datetime import timedelta 

3 

4import structlog 

5from django import forms 

6from django.contrib.auth import get_user_model 

7from django.core.exceptions import ValidationError 

8from django.shortcuts import get_object_or_404 

9from django.utils import timezone 

10 

11from meshadmin.server.networks.models import ( 

12 CA, 

13 Group, 

14 GroupConfig, 

15 Host, 

16 Network, 

17 NetworkMembership, 

18 Rule, 

19 Template, 

20) 

21from meshadmin.server.networks.services import ( 

22 create_network, 

23 create_network_ca, 

24 network_available_hosts_iterator, 

25) 

26 

27logger = structlog.get_logger(__name__) 

28 

29User = get_user_model() 

30 

31RECOMMENDED_RANGES = [ 

32 ("192.168.0.0/16", "Private network range, ideal for small networks"), 

33 ("172.16.0.0/12", "Private network range, good for medium networks"), 

34 ("10.0.0.0/8", "Private network range, suitable for large networks"), 

35 ("100.64.0.0/10", "Carrier-grade NAT range, recommended by Nebula docs"), 

36] 

37 

38 

39class NetworkForm(forms.ModelForm): 

40 class Meta: 

41 model = Network 

42 fields = ("name", "cidr", "update_interval") 

43 widgets = { 

44 "update_interval": forms.NumberInput(attrs={"min": 5, "max": 3600}), 

45 } 

46 

47 def __init__(self, *args, **kwargs): 

48 self.request = kwargs.pop("request", None) 

49 super().__init__(*args, **kwargs) 

50 

51 def clean_cidr(self): 

52 cidr = self.cleaned_data["cidr"] 

53 

54 try: 

55 network = ipaddress.ip_network(cidr) 

56 except ValueError as e: 

57 raise ValidationError(f"Invalid CIDR format: {str(e)}") 

58 

59 in_recommended_range = False 

60 for recommended_cidr, _ in RECOMMENDED_RANGES: 

61 if ipaddress.ip_network(cidr).subnet_of( 

62 ipaddress.ip_network(recommended_cidr) 

63 ): 

64 in_recommended_range = True 

65 break 

66 

67 if not in_recommended_range: 

68 suggestions = [] 

69 network_size = network.num_addresses 

70 for recommended_cidr, description in RECOMMENDED_RANGES: 

71 rec_network = ipaddress.ip_network(recommended_cidr) 

72 if network_size <= rec_network.num_addresses: 

73 suggestions.append(f"{recommended_cidr} - {description}") 

74 

75 raise ValidationError( 

76 f"Warning: The CIDR {cidr} is outside recommended network ranges. " 

77 "Consider using one of these ranges instead:\n" 

78 + "\n".join(f"{suggestion}" for suggestion in suggestions) 

79 ) 

80 

81 return cidr 

82 

83 def save(self, commit=True): 

84 logger.info("save network") 

85 instance = super().save(commit=False) 

86 if not instance.pk: 

87 instance = create_network( 

88 network_name=self.cleaned_data["name"], 

89 network_cidr=self.cleaned_data["cidr"], 

90 update_interval=self.cleaned_data["update_interval"], 

91 user=self.request.user, 

92 ) 

93 elif commit: 

94 instance.save() 

95 

96 self.save_m2m() 

97 return instance 

98 

99 

100class CAForm(forms.ModelForm): 

101 class Meta: 

102 model = CA 

103 fields = ("name", "network") 

104 

105 def __init__(self, *args, **kwargs): 

106 network = kwargs.pop("network", None) 

107 super().__init__(*args, **kwargs) 

108 

109 if network: 

110 self.fields.pop("network") 

111 self.instance.network = network 

112 

113 def save(self, commit=True): 

114 instance = super().save(commit=False) 

115 

116 if instance.pk: 

117 if commit: 

118 instance.save() 

119 else: 

120 instance = create_network_ca(instance.name, instance.network) 

121 

122 return instance 

123 

124 

125class GroupForm(forms.ModelForm): 

126 class Meta: 

127 model = Group 

128 fields = ("name", "description") 

129 

130 def __init__(self, *args, **kwargs): 

131 network = kwargs.pop("network", None) 

132 super().__init__(*args, **kwargs) 

133 

134 if network: 

135 self.instance.network = network 

136 

137 

138class TemplateForm(forms.ModelForm): 

139 expiry_days = forms.IntegerField( 

140 required=False, 

141 min_value=1, 

142 label="Expires in (days)", 

143 help_text="Days until the key expires. Leave empty for no expiration.", 

144 ) 

145 

146 class Meta: 

147 model = Template 

148 fields = ( 

149 "name", 

150 "network", 

151 "is_lighthouse", 

152 "is_relay", 

153 "use_relay", 

154 "groups", 

155 "reusable", 

156 "usage_limit", 

157 "ephemeral_peers", 

158 ) 

159 

160 def __init__(self, *args, **kwargs): 

161 network = kwargs.pop("network", None) 

162 super().__init__(*args, **kwargs) 

163 

164 if self.instance and self.instance.pk and self.instance.expires_at: 

165 days_remaining = (self.instance.expires_at - timezone.now()).days 

166 if days_remaining > 0: 

167 self.fields["expiry_days"].initial = days_remaining 

168 

169 network_id = None 

170 if network: 

171 network_id = network.id 

172 self.fields.pop("network") 

173 self.instance.network = network 

174 elif self.instance and self.instance.pk: 

175 network_id = self.instance.network_id 

176 elif "initial" in kwargs and "network_id" in kwargs["initial"]: 

177 network_id = kwargs["initial"]["network_id"] 

178 

179 if network_id: 

180 self.fields["groups"].queryset = Group.objects.filter( 

181 network_id=network_id 

182 ).all() 

183 

184 def save(self, commit=True): 

185 instance = super().save(commit=False) 

186 

187 expiry_days = self.cleaned_data.get("expiry_days") 

188 if expiry_days: 

189 instance.expires_at = timezone.now() + timedelta(days=expiry_days) 

190 elif "expiry_days" in self.changed_data: 

191 instance.expires_at = None 

192 

193 if commit: 

194 instance.save() 

195 self.save_m2m() 

196 

197 return instance 

198 

199 

200class HostForm(forms.ModelForm): 

201 class Meta: 

202 model = Host 

203 fields = ( 

204 "name", 

205 "network", 

206 "assigned_ip", 

207 "is_lighthouse", 

208 "is_relay", 

209 "use_relay", 

210 "groups", 

211 "public_ip_or_hostname", 

212 "public_auth_key", 

213 "interface", 

214 ) 

215 

216 def __init__(self, *args, **kwargs): 

217 network = kwargs.pop("network", None) 

218 super().__init__(*args, **kwargs) 

219 

220 network_id = None 

221 if network: 

222 network_id = network.id 

223 self.fields.pop("network") 

224 self.instance.network = network 

225 elif self.instance and self.instance.pk: 

226 network_id = self.instance.network_id 

227 

228 if network_id: 

229 self.fields["groups"].queryset = Group.objects.filter( 

230 network_id=network_id 

231 ).all() 

232 

233 if not self.instance.pk: 

234 network = Network.objects.get(id=network_id) 

235 ipv4_iterator = network_available_hosts_iterator(network) 

236 self.initial["assigned_ip"] = next(ipv4_iterator) 

237 

238 

239class RuleForm(forms.ModelForm): 

240 class Meta: 

241 model = Rule 

242 fields = ( 

243 "description", 

244 "direction", 

245 "proto", 

246 "port", 

247 "group", 

248 "groups", 

249 "cidr", 

250 "local_cidr", 

251 ) 

252 

253 def __init__(self, *args, **kwargs): 

254 super().__init__(*args, **kwargs) 

255 security_group_id = kwargs.get("initial", {}).get("security_group_id") 

256 if not security_group_id and kwargs.get("instance"): 

257 security_group_id = kwargs["instance"].security_group_id 

258 

259 if security_group_id: 

260 security_group = get_object_or_404(Group, id=security_group_id) 

261 group_queryset = Group.objects.filter(network_id=security_group.network_id) 

262 self.fields["group"].queryset = group_queryset 

263 self.fields["groups"].queryset = group_queryset 

264 

265 def clean(self): 

266 cleaned_data = super().clean() 

267 

268 # Validate port format 

269 port = cleaned_data.get("port") 

270 if port and port not in ("0", "any", "fragment"): 

271 if "-" in port: 

272 try: 

273 start, end = map(int, port.split("-")) 

274 if not (0 <= start <= 65535 and 0 <= end <= 65535): 

275 raise ValueError 

276 except ValueError: 

277 raise ValidationError( 

278 { 

279 "port": "Port range must be two valid port numbers (0-65535) separated by a hyphen" 

280 } 

281 ) 

282 elif not port.isdigit() or not 0 <= int(port) <= 65535: 

283 raise ValidationError( 

284 { 

285 "port": "Port must be 'any', 'fragment', or a number between 0 and 65535" 

286 } 

287 ) 

288 

289 # Validate that at least one target specification exists 

290 group = cleaned_data.get("group") 

291 groups = cleaned_data.get("groups") 

292 cidr = cleaned_data.get("cidr") 

293 

294 if not any([group, groups.exists() if groups else False, cidr]): 

295 raise ValidationError( 

296 "At least one of group, groups, or CIDR must be specified to identify " 

297 "which hosts the rule applies to." 

298 ) 

299 

300 # Validate CIDR format if provided 

301 if cidr: 

302 try: 

303 ipaddress.ip_network(cidr) 

304 except ValueError: 

305 raise ValidationError( 

306 {"cidr": "Invalid CIDR format. Example: 0.0.0.0/0"} 

307 ) 

308 

309 # Validate local_cidr format if provided 

310 local_cidr = cleaned_data.get("local_cidr") 

311 if local_cidr: 

312 try: 

313 ipaddress.ip_network(local_cidr) 

314 except ValueError: 

315 raise ValidationError( 

316 {"local_cidr": "Invalid CIDR format. Example: 0.0.0.0/0"} 

317 ) 

318 

319 return cleaned_data 

320 

321 

322class NetworkMembershipForm(forms.ModelForm): 

323 email = forms.EmailField( 

324 help_text="Enter the email address of the user you want to add to the network" 

325 ) 

326 

327 class Meta: 

328 model = NetworkMembership 

329 fields = ["role"] 

330 

331 def __init__(self, *args, **kwargs): 

332 self.network = kwargs.pop("network", None) 

333 super().__init__(*args, **kwargs) 

334 if self.instance.pk: 

335 self.fields["email"].initial = self.instance.user.email 

336 self.fields["email"].disabled = True 

337 

338 def clean_email(self): 

339 email = self.cleaned_data["email"] 

340 user = User.objects.filter(email=email).first() 

341 if not user: 

342 raise ValidationError("No user found with this email address") 

343 if ( 

344 self.network 

345 and NetworkMembership.objects.filter( 

346 network=self.network, user=user 

347 ).exists() 

348 ): 

349 raise ValidationError("This user is already a member of the network") 

350 return email 

351 

352 def save(self, commit=True): 

353 instance = super().save(commit=False) 

354 if not instance.pk: 

355 email = self.cleaned_data["email"] 

356 user = User.objects.get(email=email) 

357 instance.user = user 

358 instance.network = self.network 

359 if commit: 

360 instance.save() 

361 return instance 

362 

363 

364class GroupConfigForm(forms.ModelForm): 

365 class Meta: 

366 model = GroupConfig 

367 fields = ["key", "value"] 

368 widgets = { 

369 "value": forms.Textarea(attrs={"rows": 4}), 

370 } 

371 

372 def clean_value(self): 

373 value = self.cleaned_data["value"] 

374 key = self.cleaned_data.get("key") 

375 

376 if not key: 

377 return value 

378 

379 # Validation based on https://docs.defined.net/api/tag-create/ 

380 port_fields = [ 

381 "lighthouse.dns.port", 

382 "listen.port", 

383 ] 

384 boolean_fields = [ 

385 "lighthouse.serve_dns", 

386 "punchy.punch", 

387 "punchy.respond", 

388 "stats.message_metrics", 

389 "stats.lighthouse_metrics", 

390 ] 

391 interval_fields = [ 

392 "punchy.delay", 

393 "punchy.respond_delay", 

394 "stats.interval", 

395 ] 

396 

397 # Validate ports 

398 if key in port_fields: 

399 try: 

400 port = int(value) 

401 if port < 0 or port > 65535: 

402 raise forms.ValidationError("Port must be between 0 and 65535") 

403 return str(port) 

404 except ValueError: 

405 raise forms.ValidationError("Port must be a valid integer") 

406 

407 # Validate booleans 

408 elif key in boolean_fields: 

409 if value.lower() not in ["true", "false"]: 

410 raise forms.ValidationError("Value must be a boolean (true/false)") 

411 return value.lower() 

412 

413 # Validate intervals 

414 elif key in interval_fields: 

415 try: 

416 interval = int(value) 

417 if interval < 0: 

418 raise forms.ValidationError( 

419 "Interval must be a non-negative integer" 

420 ) 

421 return str(interval) 

422 except ValueError: 

423 raise forms.ValidationError("Interval must be a valid integer") 

424 

425 # Validate stats type 

426 elif key == "stats.type": 

427 valid_types = ["graphite", "prometheus"] 

428 if value.lower() not in valid_types: 

429 raise forms.ValidationError( 

430 f"Stats type must be one of: {', '.join(valid_types)}" 

431 ) 

432 return value.lower() 

433 

434 # Validate stats protocol 

435 elif key == "stats.protocol": 

436 valid_protocols = ["tcp", "udp"] 

437 if value.lower() not in valid_protocols: 

438 raise forms.ValidationError( 

439 f"Protocol must be one of: {', '.join(valid_protocols)}" 

440 ) 

441 return value.lower() 

442 

443 return value