Coverage for src/meshadmin/server/networks/tests/test_services.py: 100%

240 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-25 08:49 +0200

1from datetime import timedelta 

2 

3import pytest 

4import yaml 

5from django.db import IntegrityError 

6from django.utils import timezone 

7from jwcrypto.jwk import JWK 

8 

9from meshadmin.common.utils import create_keys 

10from meshadmin.server.networks.models import GroupConfig, Host, Rule 

11from meshadmin.server.networks.services import ( 

12 apply_group_config_overrides, 

13 create_available_hosts_iterator, 

14 create_group, 

15 create_network_ca, 

16 create_template, 

17 enrollment, 

18 generate_config_yaml, 

19 generate_enrollment_token, 

20 network_available_hosts_iterator, 

21) 

22 

23 

24def test_create_available_hosts_iterator(): 

25 cidr = "192.168.1.0/24" 

26 iterator = create_available_hosts_iterator(cidr, []) 

27 first_ip = next(iterator) 

28 assert str(first_ip) == "192.168.1.1" 

29 

30 

31def test_create_available_hosts_iterator_with_unavailable_ips(): 

32 cidr = "192.168.1.0/24" 

33 unavailable = ["192.168.1.1", "192.168.1.2"] 

34 iterator = create_available_hosts_iterator(cidr, unavailable) 

35 first_available = next(iterator) 

36 assert str(first_available) == "192.168.1.3" 

37 

38 

39def test_network_available_hosts_iterator(test_network): 

40 network = test_network(name="testnet", cidr="10.0.0.0/24") 

41 Host.objects.create( 

42 network=network, 

43 name="host1", 

44 assigned_ip="10.0.0.1", 

45 public_key="test", 

46 ) 

47 Host.objects.create( 

48 network=network, 

49 name="host2", 

50 assigned_ip="10.0.0.2", 

51 public_key="test", 

52 ) 

53 iterator = network_available_hosts_iterator(network) 

54 first_available = next(iterator) 

55 assert str(first_available) == "10.0.0.3" 

56 

57 

58def test_create_network(test_network): 

59 network_name = "testnet" 

60 network_cidr = "100.100.64.0/24" 

61 test_net = test_network(name=network_name, cidr=network_cidr) 

62 assert test_net.name == network_name 

63 assert test_net.cidr == network_cidr 

64 assert test_net.signingca.ca.name == "auto created initial ca" 

65 assert test_net.signingca.ca.cert is not None 

66 assert test_net.signingca.ca.key is not None 

67 assert test_net.signingca.ca.cert_print is not None 

68 assert ( 

69 test_net.signingca.ca.cert_print["details"]["name"] == "auto created initial ca" 

70 ) 

71 assert test_net.signingca.ca.cert_print["details"]["isCa"] is True 

72 

73 

74def test_cannot_create_duplicate_groups_for_the_same_network(test_network): 

75 test_net = test_network(name="testnet", cidr="100.100.64.0/24") 

76 create_group(test_net.pk, "group1") 

77 with pytest.raises(IntegrityError): 

78 create_group(test_net.pk, "group1") 

79 

80 

81def test_cannot_create_a_template_based_on_a_non_existing_group(test_network): 

82 test_net = test_network(name="testnet", cidr="100.100.64.0/24") 

83 with pytest.raises(LookupError): 

84 create_template( 

85 "hosts", 

86 test_net.name, 

87 is_lighthouse=False, 

88 is_relay=False, 

89 use_relay=True, 

90 groups=["group1", "group2"], 

91 ) 

92 

93 

94def test_generate_config_yaml_with_firewall(test_network): 

95 network = test_network(name="testnet", cidr="10.0.0.0/24") 

96 create_network_ca("test_ca", network) 

97 _, public_key = create_keys() 

98 

99 host = Host.objects.create( 

100 network=network, 

101 name="test_host", 

102 assigned_ip="10.0.0.1", 

103 public_key=public_key, 

104 interface="nebula1", 

105 ) 

106 

107 security_group = create_group( 

108 network.id, "test_security_group", "Test security group" 

109 ) 

110 host.groups.add(security_group) 

111 host.save() 

112 

113 group1 = create_group(network.id, "group1") 

114 group2 = create_group(network.id, "group2") 

115 rule = Rule.objects.create( 

116 security_group=security_group, 

117 direction=Rule.Direction.INBOUND, 

118 port="80", 

119 proto="tcp", 

120 cidr="0.0.0.0/0", 

121 local_cidr="10.0.0.0/24", 

122 group=group1, 

123 ) 

124 rule.groups.add(group1, group2) 

125 

126 # Outbound rules 

127 Rule.objects.create( 

128 security_group=security_group, 

129 direction=Rule.Direction.OUTBOUND, 

130 port="443", 

131 proto="tcp", 

132 cidr="0.0.0.0/0", 

133 ) 

134 Rule.objects.create( 

135 security_group=security_group, 

136 direction=Rule.Direction.OUTBOUND, 

137 proto="udp", 

138 ) 

139 

140 config_yaml = generate_config_yaml(host.id) 

141 config_dict = yaml.safe_load(config_yaml) 

142 

143 # Verify firewall configuration exists 

144 assert "firewall" in config_dict 

145 assert "inbound" in config_dict["firewall"] 

146 assert "outbound" in config_dict["firewall"] 

147 assert len(config_dict["firewall"]["inbound"]) == 1 

148 assert len(config_dict["firewall"]["outbound"]) == 2 

149 

150 # Verify inbound rule with all attributes 

151 inbound_rule = config_dict["firewall"]["inbound"][0] 

152 assert inbound_rule["port"] == "80" 

153 assert inbound_rule["proto"] == "tcp" 

154 assert inbound_rule["cidr"] == "0.0.0.0/0" 

155 assert inbound_rule["local_cidr"] == "10.0.0.0/24" 

156 assert inbound_rule["group"] == "group1" 

157 assert set(inbound_rule["groups"]) == {"group1", "group2"} 

158 

159 # Verify simple outbound rule 

160 outbound_rule = config_dict["firewall"]["outbound"][0] 

161 assert outbound_rule["port"] == "443" 

162 assert outbound_rule["proto"] == "tcp" 

163 assert outbound_rule["cidr"] == "0.0.0.0/0" 

164 

165 # Verify minimal outbound rule 

166 minimal_rule = config_dict["firewall"]["outbound"][1] 

167 assert minimal_rule["proto"] == "udp" 

168 assert minimal_rule["port"] == "any" 

169 assert "cidr" not in minimal_rule 

170 assert "local_cidr" not in minimal_rule 

171 assert "group" not in minimal_rule 

172 assert "groups" not in minimal_rule 

173 

174 

175def test_lighthouse_relay_configuration(test_network): 

176 network = test_network(name="testnet", cidr="10.0.0.0/24") 

177 create_network_ca("test_ca", network) 

178 

179 lighthouse = Host.objects.create( 

180 network=network, 

181 name="lighthouse", 

182 assigned_ip="10.0.0.1", 

183 public_key=create_keys()[1], 

184 is_lighthouse=True, 

185 is_relay=True, 

186 public_ip_or_hostname="public.lighthouse.com", 

187 ) 

188 host = Host.objects.create( 

189 network=network, 

190 name="client", 

191 assigned_ip="10.0.0.2", 

192 public_key=create_keys()[1], 

193 use_relay=True, 

194 ) 

195 

196 lighthouse_config = yaml.safe_load(generate_config_yaml(lighthouse.id)) 

197 assert lighthouse_config["lighthouse"]["am_lighthouse"] is True 

198 assert lighthouse_config["relay"]["am_relay"] is True 

199 

200 client_config = yaml.safe_load(generate_config_yaml(host.id)) 

201 assert client_config["lighthouse"]["am_lighthouse"] is False 

202 assert client_config["lighthouse"]["hosts"] == ["10.0.0.1"] 

203 assert client_config["static_host_map"]["10.0.0.1"] == [ 

204 "public.lighthouse.com:4242" 

205 ] 

206 assert client_config["relay"]["use_relays"] is True 

207 

208 

209def test_enrollment_with_existing_host_cases(test_network): 

210 network = test_network(name="testnet", cidr="10.0.0.0/24") 

211 template = create_template("test_template", network.name) 

212 token = generate_enrollment_token(template) 

213 

214 auth_key = JWK.generate(kty="RSA") 

215 public_auth_key = auth_key.export_public() 

216 _, public_key = create_keys() 

217 

218 # Initial host 

219 host1 = enrollment( 

220 enrollment_key=token, 

221 public_net_key=public_key, 

222 public_auth_key=public_auth_key, 

223 preferred_hostname="test_host", 

224 public_ip="127.0.0.1", 

225 enroll_on_existence=False, 

226 ) 

227 host1_id = host1.id 

228 

229 # Case 1: enroll_on_existence=True should raise ValueError 

230 with pytest.raises(ValueError, match="Host already enrolled"): 

231 enrollment( 

232 enrollment_key=token, 

233 public_net_key=public_key, 

234 public_auth_key=public_auth_key, 

235 preferred_hostname="test_host2", 

236 public_ip="127.0.0.2", 

237 enroll_on_existence=True, 

238 ) 

239 

240 # Case 2: enroll_on_existence=False should delete old host and create new one 

241 host2 = enrollment( 

242 enrollment_key=token, 

243 public_net_key=public_key, 

244 public_auth_key=public_auth_key, 

245 preferred_hostname="test_host2", 

246 public_ip="127.0.0.2", 

247 enroll_on_existence=False, 

248 ) 

249 assert host2.id != host1_id 

250 assert host2.name == "test_host2" 

251 assert host2.public_ip_or_hostname == "127.0.0.2" 

252 assert not Host.objects.filter(id=host1_id).exists() 

253 

254 

255def test_enrollment_lighthouse_without_public_ip(test_network): 

256 network = test_network(name="testnet", cidr="10.0.0.0/24") 

257 template = create_template( 

258 "test_template", network.name, is_lighthouse=True, is_relay=False 

259 ) 

260 token = generate_enrollment_token(template) 

261 

262 with pytest.raises( 

263 ValueError, match="Cannot enroll a lighthouse without public_ip" 

264 ): 

265 enrollment( 

266 enrollment_key=token, 

267 public_net_key="test_key", 

268 public_auth_key=JWK.generate(kty="RSA").export_public(), 

269 preferred_hostname="test_host", 

270 public_ip=None, 

271 enroll_on_existence=False, 

272 ) 

273 

274 

275def test_enrollment_hostname_increment(test_network): 

276 network = test_network(name="testnet", cidr="10.0.0.0/24") 

277 template = create_template("test_template", network.name) 

278 token = generate_enrollment_token(template) 

279 

280 for i in range(3): 

281 auth_key = JWK.generate(kty="RSA") 

282 _, public_key = create_keys() 

283 

284 host = enrollment( 

285 enrollment_key=token, 

286 public_net_key=public_key, 

287 public_auth_key=auth_key.export_public(), 

288 preferred_hostname="test-host", 

289 public_ip=f"127.0.0.{i + 1}", 

290 enroll_on_existence=False, 

291 ) 

292 

293 if i == 0: 

294 assert host.name == "test-host" 

295 else: 

296 assert host.name == f"test-host-{i}" 

297 

298 

299def test_template_with_security_group(test_network): 

300 network = test_network(name="testnet", cidr="10.0.0.0/24") 

301 create_network_ca("test_ca", network) 

302 security_group = create_group( 

303 network.id, "test_security_group", "Test security group" 

304 ) 

305 group1 = create_group(network.id, "group1") 

306 Rule.objects.create( 

307 security_group=security_group, 

308 direction=Rule.Direction.INBOUND, 

309 port="80", 

310 proto="tcp", 

311 group=group1, 

312 ) 

313 template = create_template( 

314 "test_template", 

315 network.name, 

316 is_lighthouse=False, 

317 is_relay=False, 

318 use_relay=True, 

319 groups=[security_group.name], 

320 ) 

321 token = generate_enrollment_token(template) 

322 auth_key = JWK.generate(kty="RSA") 

323 public_auth_key = auth_key.export_public() 

324 _, public_key = create_keys() 

325 

326 host = enrollment( 

327 enrollment_key=token, 

328 public_net_key=public_key, 

329 public_auth_key=public_auth_key, 

330 preferred_hostname="test-host", 

331 public_ip="127.0.0.1", 

332 enroll_on_existence=False, 

333 ) 

334 

335 assert host.groups.count() == 1 

336 assert host.groups.first() == security_group 

337 

338 

339def test_non_reusable_enrollment_key(test_network): 

340 network = test_network(name="testnet", cidr="10.0.0.0/24") 

341 template = create_template( 

342 "single_use_template", 

343 network.name, 

344 reusable=False, 

345 ) 

346 token = generate_enrollment_token(template) 

347 

348 # First enrollment should succeed 

349 auth_key = JWK.generate(kty="RSA") 

350 public_auth_key = auth_key.export_public() 

351 _, public_key = create_keys() 

352 

353 host = enrollment( 

354 enrollment_key=token, 

355 public_net_key=public_key, 

356 public_auth_key=public_auth_key, 

357 preferred_hostname="test-host", 

358 public_ip="127.0.0.1", 

359 enroll_on_existence=False, 

360 ) 

361 

362 assert host is not None 

363 assert host.name == "test-host" 

364 

365 # Second enrollment with same key should fail 

366 auth_key2 = JWK.generate(kty="RSA") 

367 public_auth_key2 = auth_key2.export_public() 

368 _, public_key2 = create_keys() 

369 

370 with pytest.raises( 

371 ValueError, match="Single-use enrollment key has already been used" 

372 ): 

373 enrollment( 

374 enrollment_key=token, 

375 public_net_key=public_key2, 

376 public_auth_key=public_auth_key2, 

377 preferred_hostname="another-host", 

378 public_ip="127.0.0.2", 

379 enroll_on_existence=False, 

380 ) 

381 

382 

383def test_enrollment_key_with_usage_limit(test_network): 

384 network = test_network(name="testnet", cidr="10.0.0.0/24") 

385 template = create_template( 

386 "limited_use_template", 

387 network.name, 

388 reusable=True, 

389 usage_limit=2, 

390 ) 

391 token = generate_enrollment_token(template) 

392 # First two enrollments should succeed 

393 for i in range(2): 

394 auth_key = JWK.generate(kty="RSA") 

395 public_auth_key = auth_key.export_public() 

396 _, public_key = create_keys() 

397 

398 host = enrollment( 

399 enrollment_key=token, 

400 public_net_key=public_key, 

401 public_auth_key=public_auth_key, 

402 preferred_hostname=f"test-host-{i}", 

403 public_ip=f"127.0.0.{i + 1}", 

404 enroll_on_existence=False, 

405 ) 

406 

407 assert host is not None 

408 assert host.name == f"test-host-{i}" 

409 

410 # Third enrollment should fail 

411 auth_key = JWK.generate(kty="RSA") 

412 public_auth_key = auth_key.export_public() 

413 _, public_key = create_keys() 

414 

415 with pytest.raises(ValueError, match="Enrollment key usage limit exceeded"): 

416 enrollment( 

417 enrollment_key=token, 

418 public_net_key=public_key, 

419 public_auth_key=public_auth_key, 

420 preferred_hostname="test-host-3", 

421 public_ip="127.0.0.3", 

422 enroll_on_existence=False, 

423 ) 

424 

425 

426def test_expired_enrollment_key(test_network): 

427 network = test_network(name="testnet", cidr="10.0.0.0/24") 

428 template = create_template( 

429 "expired_template", 

430 network.name, 

431 expires_at=timezone.now() - timedelta(days=1), 

432 ) 

433 token = generate_enrollment_token(template) 

434 

435 # Enrollment should fail 

436 auth_key = JWK.generate(kty="RSA") 

437 public_auth_key = auth_key.export_public() 

438 _, public_key = create_keys() 

439 

440 with pytest.raises(ValueError, match="Enrollment token has expired"): 

441 enrollment( 

442 enrollment_key=token, 

443 public_net_key=public_key, 

444 public_auth_key=public_auth_key, 

445 preferred_hostname="test-host", 

446 public_ip="127.0.0.1", 

447 enroll_on_existence=False, 

448 ) 

449 

450 

451def test_ephemeral_peers_flag(test_network): 

452 network = test_network(name="testnet", cidr="10.0.0.0/24") 

453 template = create_template( 

454 "ephemeral_template", 

455 network.name, 

456 ephemeral_peers=True, 

457 ) 

458 token = generate_enrollment_token(template) 

459 

460 auth_key = JWK.generate(kty="RSA") 

461 public_auth_key = auth_key.export_public() 

462 _, public_key = create_keys() 

463 

464 host = enrollment( 

465 enrollment_key=token, 

466 public_net_key=public_key, 

467 public_auth_key=public_auth_key, 

468 preferred_hostname="test-host", 

469 public_ip="127.0.0.1", 

470 enroll_on_existence=False, 

471 ) 

472 

473 assert host is not None 

474 assert host.is_ephemeral is True 

475 

476 # Test with ephemeral_peers=False 

477 template2 = create_template( 

478 "non_ephemeral_template", 

479 network.name, 

480 ephemeral_peers=False, 

481 ) 

482 token2 = generate_enrollment_token(template2) 

483 auth_key2 = JWK.generate(kty="RSA") 

484 public_auth_key2 = auth_key2.export_public() 

485 _, public_key2 = create_keys() 

486 

487 host2 = enrollment( 

488 enrollment_key=token2, 

489 public_net_key=public_key2, 

490 public_auth_key=public_auth_key2, 

491 preferred_hostname="test-host-2", 

492 public_ip="127.0.0.2", 

493 enroll_on_existence=False, 

494 ) 

495 

496 assert host2 is not None 

497 assert host2.is_ephemeral is False 

498 

499 

500def test_token_with_nonexistent_template(test_network): 

501 network = test_network(name="testnet", cidr="10.0.0.0/24") 

502 template = create_template("test_template", network.name) 

503 token = generate_enrollment_token(template) 

504 template.delete() 

505 auth_key = JWK.generate(kty="RSA") 

506 public_auth_key = auth_key.export_public() 

507 _, public_key = create_keys() 

508 with pytest.raises(ValueError, match="Template not found"): 

509 enrollment( 

510 enrollment_key=token, 

511 public_net_key=public_key, 

512 public_auth_key=public_auth_key, 

513 preferred_hostname="test-host", 

514 public_ip="127.0.0.1", 

515 enroll_on_existence=False, 

516 ) 

517 

518 

519def test_apply_group_config_overrides(test_network): 

520 network = test_network(name="testnet", cidr="10.0.0.0/24") 

521 group1 = create_group(network.id, "group1") 

522 group2 = create_group(network.id, "group2") 

523 GroupConfig.objects.create( 

524 group=group1, 

525 key="lighthouse.serve_dns", 

526 value="true", 

527 ) 

528 GroupConfig.objects.create( 

529 group=group1, 

530 key="lighthouse.dns.port", 

531 value="53", 

532 ) 

533 GroupConfig.objects.create( 

534 group=group2, 

535 key="lighthouse.dns.host", 

536 value="8.8.8.8", 

537 ) 

538 initial_config = { 

539 "firewall": {"outbound": [{}]}, 

540 } 

541 updated_config = apply_group_config_overrides(initial_config, [group1, group2]) 

542 assert updated_config["lighthouse"]["serve_dns"] is True 

543 assert updated_config["lighthouse"]["dns"]["port"] == 53 

544 assert updated_config["lighthouse"]["dns"]["host"] == "8.8.8.8" 

545 

546 

547def test_generate_config_yaml_with_config_overrides(test_network): 

548 network = test_network(name="testnet", cidr="10.0.0.0/24") 

549 create_network_ca("test_ca", network) 

550 group = create_group(network.id, "testgroup") 

551 _, public_key = create_keys() 

552 host = Host.objects.create( 

553 network=network, 

554 name="test_host", 

555 assigned_ip="10.0.0.1", 

556 public_key=public_key, 

557 interface="nebula1", 

558 ) 

559 host.groups.add(group) 

560 GroupConfig.objects.create( 

561 group=group, 

562 key="lighthouse.serve_dns", 

563 value="true", 

564 ) 

565 GroupConfig.objects.create( 

566 group=group, 

567 key="lighthouse.dns.port", 

568 value="53", 

569 ) 

570 config = generate_config_yaml(host.id) 

571 config_dict = yaml.safe_load(config) 

572 assert config_dict["lighthouse"]["serve_dns"] is True 

573 assert config_dict["lighthouse"]["dns"]["port"] == 53