Coverage for src/meshadmin/server/networks/tests/test_services.py: 100%
240 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:26 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-22 07:26 +0200
1from datetime import timedelta
3import pytest
4import yaml
5from django.db import IntegrityError
6from django.utils import timezone
7from jwcrypto.jwk import JWK
9from meshadmin.common.utils import create_keys
10from meshadmin.server.networks.models import GroupConfig, Host, Rule
11from meshadmin.server.networks.services import (
12 apply_group_config_overrides,
13 create_available_hosts_iterator,
14 create_group,
15 create_network_ca,
16 create_template,
17 enrollment,
18 generate_config_yaml,
19 generate_enrollment_token,
20 network_available_hosts_iterator,
21)
24def test_create_available_hosts_iterator():
25 cidr = "192.168.1.0/24"
26 iterator = create_available_hosts_iterator(cidr, [])
27 first_ip = next(iterator)
28 assert str(first_ip) == "192.168.1.1"
31def test_create_available_hosts_iterator_with_unavailable_ips():
32 cidr = "192.168.1.0/24"
33 unavailable = ["192.168.1.1", "192.168.1.2"]
34 iterator = create_available_hosts_iterator(cidr, unavailable)
35 first_available = next(iterator)
36 assert str(first_available) == "192.168.1.3"
39def test_network_available_hosts_iterator(test_network):
40 network = test_network(name="testnet", cidr="10.0.0.0/24")
41 Host.objects.create(
42 network=network,
43 name="host1",
44 assigned_ip="10.0.0.1",
45 public_key="test",
46 )
47 Host.objects.create(
48 network=network,
49 name="host2",
50 assigned_ip="10.0.0.2",
51 public_key="test",
52 )
53 iterator = network_available_hosts_iterator(network)
54 first_available = next(iterator)
55 assert str(first_available) == "10.0.0.3"
58def test_create_network(test_network):
59 network_name = "testnet"
60 network_cidr = "100.100.64.0/24"
61 test_net = test_network(name=network_name, cidr=network_cidr)
62 assert test_net.name == network_name
63 assert test_net.cidr == network_cidr
64 assert test_net.signingca.ca.name == "auto created initial ca"
65 assert test_net.signingca.ca.cert is not None
66 assert test_net.signingca.ca.key is not None
67 assert test_net.signingca.ca.cert_print is not None
68 assert (
69 test_net.signingca.ca.cert_print["details"]["name"] == "auto created initial ca"
70 )
71 assert test_net.signingca.ca.cert_print["details"]["isCa"] is True
74def test_cannot_create_duplicate_groups_for_the_same_network(test_network):
75 test_net = test_network(name="testnet", cidr="100.100.64.0/24")
76 create_group(test_net.pk, "group1")
77 with pytest.raises(IntegrityError):
78 create_group(test_net.pk, "group1")
81def test_cannot_create_a_template_based_on_a_non_existing_group(test_network):
82 test_net = test_network(name="testnet", cidr="100.100.64.0/24")
83 with pytest.raises(LookupError):
84 create_template(
85 "hosts",
86 test_net.name,
87 is_lighthouse=False,
88 is_relay=False,
89 use_relay=True,
90 groups=["group1", "group2"],
91 )
94def test_generate_config_yaml_with_firewall(test_network):
95 network = test_network(name="testnet", cidr="10.0.0.0/24")
96 create_network_ca("test_ca", network)
97 _, public_key = create_keys()
99 host = Host.objects.create(
100 network=network,
101 name="test_host",
102 assigned_ip="10.0.0.1",
103 public_key=public_key,
104 interface="nebula1",
105 )
107 security_group = create_group(
108 network.id, "test_security_group", "Test security group"
109 )
110 host.groups.add(security_group)
111 host.save()
113 group1 = create_group(network.id, "group1")
114 group2 = create_group(network.id, "group2")
115 rule = Rule.objects.create(
116 security_group=security_group,
117 direction=Rule.Direction.INBOUND,
118 port="80",
119 proto="tcp",
120 cidr="0.0.0.0/0",
121 local_cidr="10.0.0.0/24",
122 group=group1,
123 )
124 rule.groups.add(group1, group2)
126 # Outbound rules
127 Rule.objects.create(
128 security_group=security_group,
129 direction=Rule.Direction.OUTBOUND,
130 port="443",
131 proto="tcp",
132 cidr="0.0.0.0/0",
133 )
134 Rule.objects.create(
135 security_group=security_group,
136 direction=Rule.Direction.OUTBOUND,
137 proto="udp",
138 )
140 config_yaml = generate_config_yaml(host.id)
141 config_dict = yaml.safe_load(config_yaml)
143 # Verify firewall configuration exists
144 assert "firewall" in config_dict
145 assert "inbound" in config_dict["firewall"]
146 assert "outbound" in config_dict["firewall"]
147 assert len(config_dict["firewall"]["inbound"]) == 1
148 assert len(config_dict["firewall"]["outbound"]) == 2
150 # Verify inbound rule with all attributes
151 inbound_rule = config_dict["firewall"]["inbound"][0]
152 assert inbound_rule["port"] == "80"
153 assert inbound_rule["proto"] == "tcp"
154 assert inbound_rule["cidr"] == "0.0.0.0/0"
155 assert inbound_rule["local_cidr"] == "10.0.0.0/24"
156 assert inbound_rule["group"] == "group1"
157 assert set(inbound_rule["groups"]) == {"group1", "group2"}
159 # Verify simple outbound rule
160 outbound_rule = config_dict["firewall"]["outbound"][0]
161 assert outbound_rule["port"] == "443"
162 assert outbound_rule["proto"] == "tcp"
163 assert outbound_rule["cidr"] == "0.0.0.0/0"
165 # Verify minimal outbound rule
166 minimal_rule = config_dict["firewall"]["outbound"][1]
167 assert minimal_rule["proto"] == "udp"
168 assert minimal_rule["port"] == "any"
169 assert "cidr" not in minimal_rule
170 assert "local_cidr" not in minimal_rule
171 assert "group" not in minimal_rule
172 assert "groups" not in minimal_rule
175def test_lighthouse_relay_configuration(test_network):
176 network = test_network(name="testnet", cidr="10.0.0.0/24")
177 create_network_ca("test_ca", network)
179 lighthouse = Host.objects.create(
180 network=network,
181 name="lighthouse",
182 assigned_ip="10.0.0.1",
183 public_key=create_keys()[1],
184 is_lighthouse=True,
185 is_relay=True,
186 public_ip_or_hostname="public.lighthouse.com",
187 )
188 host = Host.objects.create(
189 network=network,
190 name="client",
191 assigned_ip="10.0.0.2",
192 public_key=create_keys()[1],
193 use_relay=True,
194 )
196 lighthouse_config = yaml.safe_load(generate_config_yaml(lighthouse.id))
197 assert lighthouse_config["lighthouse"]["am_lighthouse"] is True
198 assert lighthouse_config["relay"]["am_relay"] is True
200 client_config = yaml.safe_load(generate_config_yaml(host.id))
201 assert client_config["lighthouse"]["am_lighthouse"] is False
202 assert client_config["lighthouse"]["hosts"] == ["10.0.0.1"]
203 assert client_config["static_host_map"]["10.0.0.1"] == [
204 "public.lighthouse.com:4242"
205 ]
206 assert client_config["relay"]["use_relays"] is True
209def test_enrollment_with_existing_host_cases(test_network):
210 network = test_network(name="testnet", cidr="10.0.0.0/24")
211 template = create_template("test_template", network.name)
212 token = generate_enrollment_token(template)
214 auth_key = JWK.generate(kty="RSA")
215 public_auth_key = auth_key.export_public()
216 _, public_key = create_keys()
218 # Initial host
219 host1 = enrollment(
220 enrollment_key=token,
221 public_net_key=public_key,
222 public_auth_key=public_auth_key,
223 preferred_hostname="test_host",
224 public_ip="127.0.0.1",
225 enroll_on_existence=False,
226 )
227 host1_id = host1.id
229 # Case 1: enroll_on_existence=True should raise ValueError
230 with pytest.raises(ValueError, match="Host already enrolled"):
231 enrollment(
232 enrollment_key=token,
233 public_net_key=public_key,
234 public_auth_key=public_auth_key,
235 preferred_hostname="test_host2",
236 public_ip="127.0.0.2",
237 enroll_on_existence=True,
238 )
240 # Case 2: enroll_on_existence=False should delete old host and create new one
241 host2 = enrollment(
242 enrollment_key=token,
243 public_net_key=public_key,
244 public_auth_key=public_auth_key,
245 preferred_hostname="test_host2",
246 public_ip="127.0.0.2",
247 enroll_on_existence=False,
248 )
249 assert host2.id != host1_id
250 assert host2.name == "test_host2"
251 assert host2.public_ip_or_hostname == "127.0.0.2"
252 assert not Host.objects.filter(id=host1_id).exists()
255def test_enrollment_lighthouse_without_public_ip(test_network):
256 network = test_network(name="testnet", cidr="10.0.0.0/24")
257 template = create_template(
258 "test_template", network.name, is_lighthouse=True, is_relay=False
259 )
260 token = generate_enrollment_token(template)
262 with pytest.raises(
263 ValueError, match="Cannot enroll a lighthouse without public_ip"
264 ):
265 enrollment(
266 enrollment_key=token,
267 public_net_key="test_key",
268 public_auth_key=JWK.generate(kty="RSA").export_public(),
269 preferred_hostname="test_host",
270 public_ip=None,
271 enroll_on_existence=False,
272 )
275def test_enrollment_hostname_increment(test_network):
276 network = test_network(name="testnet", cidr="10.0.0.0/24")
277 template = create_template("test_template", network.name)
278 token = generate_enrollment_token(template)
280 for i in range(3):
281 auth_key = JWK.generate(kty="RSA")
282 _, public_key = create_keys()
284 host = enrollment(
285 enrollment_key=token,
286 public_net_key=public_key,
287 public_auth_key=auth_key.export_public(),
288 preferred_hostname="test-host",
289 public_ip=f"127.0.0.{i + 1}",
290 enroll_on_existence=False,
291 )
293 if i == 0:
294 assert host.name == "test-host"
295 else:
296 assert host.name == f"test-host-{i}"
299def test_template_with_security_group(test_network):
300 network = test_network(name="testnet", cidr="10.0.0.0/24")
301 create_network_ca("test_ca", network)
302 security_group = create_group(
303 network.id, "test_security_group", "Test security group"
304 )
305 group1 = create_group(network.id, "group1")
306 Rule.objects.create(
307 security_group=security_group,
308 direction=Rule.Direction.INBOUND,
309 port="80",
310 proto="tcp",
311 group=group1,
312 )
313 template = create_template(
314 "test_template",
315 network.name,
316 is_lighthouse=False,
317 is_relay=False,
318 use_relay=True,
319 groups=[security_group.name],
320 )
321 token = generate_enrollment_token(template)
322 auth_key = JWK.generate(kty="RSA")
323 public_auth_key = auth_key.export_public()
324 _, public_key = create_keys()
326 host = enrollment(
327 enrollment_key=token,
328 public_net_key=public_key,
329 public_auth_key=public_auth_key,
330 preferred_hostname="test-host",
331 public_ip="127.0.0.1",
332 enroll_on_existence=False,
333 )
335 assert host.groups.count() == 1
336 assert host.groups.first() == security_group
339def test_non_reusable_enrollment_key(test_network):
340 network = test_network(name="testnet", cidr="10.0.0.0/24")
341 template = create_template(
342 "single_use_template",
343 network.name,
344 reusable=False,
345 )
346 token = generate_enrollment_token(template)
348 # First enrollment should succeed
349 auth_key = JWK.generate(kty="RSA")
350 public_auth_key = auth_key.export_public()
351 _, public_key = create_keys()
353 host = enrollment(
354 enrollment_key=token,
355 public_net_key=public_key,
356 public_auth_key=public_auth_key,
357 preferred_hostname="test-host",
358 public_ip="127.0.0.1",
359 enroll_on_existence=False,
360 )
362 assert host is not None
363 assert host.name == "test-host"
365 # Second enrollment with same key should fail
366 auth_key2 = JWK.generate(kty="RSA")
367 public_auth_key2 = auth_key2.export_public()
368 _, public_key2 = create_keys()
370 with pytest.raises(
371 ValueError, match="Single-use enrollment key has already been used"
372 ):
373 enrollment(
374 enrollment_key=token,
375 public_net_key=public_key2,
376 public_auth_key=public_auth_key2,
377 preferred_hostname="another-host",
378 public_ip="127.0.0.2",
379 enroll_on_existence=False,
380 )
383def test_enrollment_key_with_usage_limit(test_network):
384 network = test_network(name="testnet", cidr="10.0.0.0/24")
385 template = create_template(
386 "limited_use_template",
387 network.name,
388 reusable=True,
389 usage_limit=2,
390 )
391 token = generate_enrollment_token(template)
392 # First two enrollments should succeed
393 for i in range(2):
394 auth_key = JWK.generate(kty="RSA")
395 public_auth_key = auth_key.export_public()
396 _, public_key = create_keys()
398 host = enrollment(
399 enrollment_key=token,
400 public_net_key=public_key,
401 public_auth_key=public_auth_key,
402 preferred_hostname=f"test-host-{i}",
403 public_ip=f"127.0.0.{i + 1}",
404 enroll_on_existence=False,
405 )
407 assert host is not None
408 assert host.name == f"test-host-{i}"
410 # Third enrollment should fail
411 auth_key = JWK.generate(kty="RSA")
412 public_auth_key = auth_key.export_public()
413 _, public_key = create_keys()
415 with pytest.raises(ValueError, match="Enrollment key usage limit exceeded"):
416 enrollment(
417 enrollment_key=token,
418 public_net_key=public_key,
419 public_auth_key=public_auth_key,
420 preferred_hostname="test-host-3",
421 public_ip="127.0.0.3",
422 enroll_on_existence=False,
423 )
426def test_expired_enrollment_key(test_network):
427 network = test_network(name="testnet", cidr="10.0.0.0/24")
428 template = create_template(
429 "expired_template",
430 network.name,
431 expires_at=timezone.now() - timedelta(days=1),
432 )
433 token = generate_enrollment_token(template)
435 # Enrollment should fail
436 auth_key = JWK.generate(kty="RSA")
437 public_auth_key = auth_key.export_public()
438 _, public_key = create_keys()
440 with pytest.raises(ValueError, match="Enrollment token has expired"):
441 enrollment(
442 enrollment_key=token,
443 public_net_key=public_key,
444 public_auth_key=public_auth_key,
445 preferred_hostname="test-host",
446 public_ip="127.0.0.1",
447 enroll_on_existence=False,
448 )
451def test_ephemeral_peers_flag(test_network):
452 network = test_network(name="testnet", cidr="10.0.0.0/24")
453 template = create_template(
454 "ephemeral_template",
455 network.name,
456 ephemeral_peers=True,
457 )
458 token = generate_enrollment_token(template)
460 auth_key = JWK.generate(kty="RSA")
461 public_auth_key = auth_key.export_public()
462 _, public_key = create_keys()
464 host = enrollment(
465 enrollment_key=token,
466 public_net_key=public_key,
467 public_auth_key=public_auth_key,
468 preferred_hostname="test-host",
469 public_ip="127.0.0.1",
470 enroll_on_existence=False,
471 )
473 assert host is not None
474 assert host.is_ephemeral is True
476 # Test with ephemeral_peers=False
477 template2 = create_template(
478 "non_ephemeral_template",
479 network.name,
480 ephemeral_peers=False,
481 )
482 token2 = generate_enrollment_token(template2)
483 auth_key2 = JWK.generate(kty="RSA")
484 public_auth_key2 = auth_key2.export_public()
485 _, public_key2 = create_keys()
487 host2 = enrollment(
488 enrollment_key=token2,
489 public_net_key=public_key2,
490 public_auth_key=public_auth_key2,
491 preferred_hostname="test-host-2",
492 public_ip="127.0.0.2",
493 enroll_on_existence=False,
494 )
496 assert host2 is not None
497 assert host2.is_ephemeral is False
500def test_token_with_nonexistent_template(test_network):
501 network = test_network(name="testnet", cidr="10.0.0.0/24")
502 template = create_template("test_template", network.name)
503 token = generate_enrollment_token(template)
504 template.delete()
505 auth_key = JWK.generate(kty="RSA")
506 public_auth_key = auth_key.export_public()
507 _, public_key = create_keys()
508 with pytest.raises(ValueError, match="Template not found"):
509 enrollment(
510 enrollment_key=token,
511 public_net_key=public_key,
512 public_auth_key=public_auth_key,
513 preferred_hostname="test-host",
514 public_ip="127.0.0.1",
515 enroll_on_existence=False,
516 )
519def test_apply_group_config_overrides(test_network):
520 network = test_network(name="testnet", cidr="10.0.0.0/24")
521 group1 = create_group(network.id, "group1")
522 group2 = create_group(network.id, "group2")
523 GroupConfig.objects.create(
524 group=group1,
525 key="lighthouse.serve_dns",
526 value="true",
527 )
528 GroupConfig.objects.create(
529 group=group1,
530 key="lighthouse.dns.port",
531 value="53",
532 )
533 GroupConfig.objects.create(
534 group=group2,
535 key="lighthouse.dns.host",
536 value="8.8.8.8",
537 )
538 initial_config = {
539 "firewall": {"outbound": [{}]},
540 }
541 updated_config = apply_group_config_overrides(initial_config, [group1, group2])
542 assert updated_config["lighthouse"]["serve_dns"] is True
543 assert updated_config["lighthouse"]["dns"]["port"] == 53
544 assert updated_config["lighthouse"]["dns"]["host"] == "8.8.8.8"
547def test_generate_config_yaml_with_config_overrides(test_network):
548 network = test_network(name="testnet", cidr="10.0.0.0/24")
549 create_network_ca("test_ca", network)
550 group = create_group(network.id, "testgroup")
551 _, public_key = create_keys()
552 host = Host.objects.create(
553 network=network,
554 name="test_host",
555 assigned_ip="10.0.0.1",
556 public_key=public_key,
557 interface="nebula1",
558 )
559 host.groups.add(group)
560 GroupConfig.objects.create(
561 group=group,
562 key="lighthouse.serve_dns",
563 value="true",
564 )
565 GroupConfig.objects.create(
566 group=group,
567 key="lighthouse.dns.port",
568 value="53",
569 )
570 config = generate_config_yaml(host.id)
571 config_dict = yaml.safe_load(config)
572 assert config_dict["lighthouse"]["serve_dns"] is True
573 assert config_dict["lighthouse"]["dns"]["port"] == 53