Coverage for src/meshadmin/server/networks/tests/test_api.py: 100%
150 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-04 09:21 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-04 09:21 +0200
1import json
2from datetime import timedelta
3from pathlib import Path
5import pytest
6from django.utils import timezone
7from jwcrypto.jwk import JWK
8from jwcrypto.jwt import JWT
10from meshadmin.common.utils import create_keys
11from meshadmin.server.networks import schemas
12from meshadmin.server.networks.models import Host, Template
13from meshadmin.server.networks.services import (
14 create_template,
15 generate_enrollment_token,
16)
19@pytest.fixture
20def keycloak_key():
21 key = JWK.generate(kty="RSA", size=2048, kid="test-key-id")
22 return {
23 "private_key": key,
24 "public_key": json.loads(key.export_public()),
25 "kid": "test-key-id",
26 }
29@pytest.fixture
30def keycloak_auth_headers(mocker, keycloak_key, settings):
31 jwt_token = JWT(
32 header={"alg": "RS256", "kid": keycloak_key["kid"]},
33 claims={
34 "exp": 9999999999,
35 "iat": 1741328648,
36 "iss": settings.KEYCLOAK_ISSUER,
37 "azp": settings.KEYCLOAK_ADMIN_CLIENT,
38 "typ": "Bearer",
39 },
40 )
41 jwt_token.make_signed_token(keycloak_key["private_key"])
42 mock_get = mocker.patch("requests.get")
43 mock_get.return_value.json.return_value = {"keys": [keycloak_key["public_key"]]}
44 mock_get.return_value.raise_for_status.return_value = None
46 mocker.patch(
47 "meshadmin.server.networks.api.KeycloakAuthBearer.get_keycloak_public_key",
48 return_value={"keys": [keycloak_key["public_key"]]},
49 )
50 return {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"}
53def test_template_endpoints(db, client, test_network, keycloak_auth_headers):
54 network = test_network(name="test_network", cidr="100.100.64.0/24")
55 template_data = schemas.TemplateCreate(
56 name="test_template",
57 network_name=network.name,
58 is_lighthouse=True,
59 is_relay=True,
60 use_relay=False,
61 )
62 response = client.post(
63 "/api/v1/templates",
64 data=template_data.model_dump_json(),
65 content_type="application/json",
66 **keycloak_auth_headers,
67 )
68 assert response.status_code == 200
69 response_data = response.json()
70 assert response_data["name"] == "test_template"
71 assert "enrollment_key" in response_data
73 # Delete template
74 response = client.delete(
75 "/api/v1/templates/test_template",
76 **keycloak_auth_headers,
77 )
78 assert response.status_code == 200
79 assert not Template.objects.filter(name="test_template").exists()
82def test_host_endpoints(client, test_network, keycloak_auth_headers):
83 network = test_network(name="test_network", cidr="100.100.64.0/24")
84 # First create and enroll a host
85 template = create_template(
86 "host_template",
87 network.name,
88 is_lighthouse=False,
89 is_relay=False,
90 )
91 token = generate_enrollment_token(template)
92 auth_key = JWK.generate(kty="RSA", size=2048)
93 _, public_net_key = create_keys()
94 enrollment_data = schemas.ClientEnrollment(
95 enrollment_key=token,
96 public_net_key=public_net_key,
97 public_auth_key=auth_key.export_public(),
98 preferred_hostname="test-host",
99 public_ip="127.0.0.1",
100 enroll_on_existence=False,
101 )
102 response = client.post(
103 "/api/v1/enroll",
104 data=enrollment_data.model_dump_json(),
105 content_type="application/json",
106 )
107 assert response.status_code == 200
109 # Delete host
110 response = client.delete(
111 "/api/v1/hosts/test-host",
112 **keycloak_auth_headers,
113 )
114 assert response.status_code == 200
115 assert not Host.objects.filter(name="test-host").exists()
118def test_unauthorized_access(db, client):
119 endpoints = [
120 ("POST", "/api/v1/networks"),
121 ("GET", "/api/v1/networks"),
122 ("DELETE", "/api/v1/networks/test"),
123 ("POST", "/api/v1/templates"),
124 ("DELETE", "/api/v1/templates/test"),
125 ("DELETE", "/api/v1/hosts/test"),
126 ]
128 for method, endpoint in endpoints:
129 if method == "POST":
130 response = client.post(
131 endpoint,
132 data="{}",
133 content_type="application/json",
134 )
135 else:
136 response = (
137 client.get(endpoint) if method == "GET" else client.delete(endpoint)
138 )
139 assert response.status_code == 401, f"{method} {endpoint} should require auth"
142def test_wrong_client_id(db, client, keycloak_key, settings):
143 jwt_token = JWT(
144 header={"alg": "RS256", "kid": keycloak_key["kid"]},
145 claims={
146 "exp": 9999999999,
147 "iat": 1741328648,
148 "iss": settings.KEYCLOAK_ISSUER,
149 "azp": "wrong-client", # Not admin-cli
150 "typ": "Bearer",
151 },
152 )
153 jwt_token.make_signed_token(keycloak_key["private_key"])
154 headers = {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"}
155 response = client.get("/api/v1/networks", **headers)
156 assert response.status_code == 401
159def test_wrong_signature(db, client, keycloak_key, mocker):
160 different_key = JWK.generate(kty="RSA", size=2048)
161 jwt_token = JWT(
162 header={"alg": "RS256", "kid": keycloak_key["kid"]},
163 claims={
164 "exp": 9999999999,
165 "iat": 1741328648,
166 "iss": "http://localhost:8080/realms/meshadmin",
167 "azp": "admin-cli",
168 "typ": "Bearer",
169 },
170 )
171 jwt_token.make_signed_token(different_key)
172 mock_response = mocker.Mock()
173 mock_response.json.return_value = {"keys": [keycloak_key["public_key"]]}
174 mock_response.raise_for_status.return_value = None
175 mocker.patch("requests.get", return_value=mock_response)
176 headers = {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"}
177 response = client.get("/api/v1/networks", **headers)
178 assert response.status_code == 401
181def test_get_config(db, client, test_network):
182 network = test_network(name="test_network", cidr="10.0.0.0/24")
183 auth_key = JWK.generate(kty="RSA", size=2048)
184 public_auth_key = auth_key.export_public()
185 _, public_key = create_keys()
187 host = Host.objects.create(
188 network=network,
189 name="test-host",
190 assigned_ip="10.0.0.1",
191 public_key=public_key,
192 public_auth_key=public_auth_key,
193 public_auth_kid=auth_key.thumbprint(),
194 )
195 jwt_token = JWT(
196 header={"alg": "RS256", "kid": auth_key.thumbprint()},
197 claims={
198 "exp": 9999999999,
199 "kid": auth_key.thumbprint(),
200 },
201 )
202 jwt_token.make_signed_token(auth_key)
203 host_auth_headers = {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"}
204 response = client.get(
205 "/api/v1/config",
206 **host_auth_headers,
207 )
209 assert response.status_code == 200
210 assert response["Content-Type"] == "text/yaml"
211 host.refresh_from_db()
212 assert host.last_config_refresh is not None
213 assert host.last_config_refresh > timezone.now() - timedelta(minutes=1)
216def test_cleanup_ephemeral_hosts(db, client, test_network):
217 network = test_network(name="test_network", cidr="10.0.0.0/24")
218 auth_key = JWK.generate(kty="RSA", size=2048)
219 public_auth_key = auth_key.export_public()
220 _, public_key = create_keys()
222 Host.objects.create(
223 network=network,
224 name="requesting-host",
225 assigned_ip="10.0.0.10",
226 public_key=public_key,
227 public_auth_key=public_auth_key,
228 public_auth_kid=auth_key.thumbprint(),
229 )
230 jwt_token = JWT(
231 header={"alg": "RS256", "kid": auth_key.thumbprint()},
232 claims={
233 "exp": 9999999999,
234 "kid": auth_key.thumbprint(),
235 },
236 )
237 jwt_token.make_signed_token(auth_key)
238 host_auth_headers = {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"}
240 # Create some ephemeral hosts
241 # 1. A stale host (last_config_refresh > 10 minutes ago)
242 stale_host = Host.objects.create(
243 network=network,
244 name="stale-host",
245 assigned_ip="10.0.0.1",
246 public_key="test-key-1",
247 public_auth_key="{}",
248 public_auth_kid="test-kid-1",
249 is_ephemeral=True,
250 last_config_refresh=timezone.now() - timedelta(minutes=15),
251 )
253 # 2. A recently seen ephemeral host (should not be removed)
254 recent_host = Host.objects.create(
255 network=network,
256 name="recent-host",
257 assigned_ip="10.0.0.2",
258 public_key="test-key-2",
259 public_auth_key="{}",
260 public_auth_kid="test-kid-2",
261 is_ephemeral=True,
262 last_config_refresh=timezone.now() - timedelta(minutes=5),
263 )
265 # 3. A non-ephemeral host (should not be removed regardless of last_config_refresh)
266 non_ephemeral_host = Host.objects.create(
267 network=network,
268 name="non-ephemeral-host",
269 assigned_ip="10.0.0.3",
270 public_key="test-key-3",
271 public_auth_key="{}",
272 public_auth_kid="test-kid-3",
273 is_ephemeral=False,
274 last_config_refresh=timezone.now() - timedelta(minutes=30),
275 )
277 response = client.post(
278 "/api/v1/cleanup-ephemeral",
279 **host_auth_headers,
280 )
282 assert response.status_code == 200
283 result = response.json()
284 assert result["removed_count"] == 1
285 assert not Host.objects.filter(id=stale_host.id).exists()
286 assert Host.objects.filter(id=recent_host.id).exists()
287 assert Host.objects.filter(id=non_ephemeral_host.id).exists()
289 # Call the endpoint again - should remove no hosts
290 response = client.post(
291 "/api/v1/cleanup-ephemeral",
292 **host_auth_headers,
293 )
294 assert response.status_code == 200
295 result = response.json()
296 assert result["removed_count"] == 0
299def test_enrollment_api_with_jwt(test_network, client):
300 network = test_network(name="testnet", cidr="10.0.0.0/24")
301 template = create_template(
302 "jwt_template",
303 network.name,
304 )
305 token = generate_enrollment_token(template)
306 auth_key = JWK.generate(kty="RSA", size=2048)
307 _, public_net_key = create_keys()
308 enrollment_data = schemas.ClientEnrollment(
309 enrollment_key=token,
310 public_net_key=public_net_key,
311 public_auth_key=auth_key.export_public(),
312 preferred_hostname="test-host",
313 public_ip="127.0.0.1",
314 enroll_on_existence=False,
315 )
316 response = client.post(
317 "/api/v1/enroll",
318 data=enrollment_data.model_dump_json(),
319 content_type="application/json",
320 )
321 assert response.status_code == 200
322 assert Host.objects.filter(name="test-host").exists()
325def test_download_nebula_binary(client, mocker):
326 mock_content = b"mock binary content"
327 mock_file = mocker.mock_open(read_data=mock_content)
328 mocker.patch("builtins.open", mock_file)
329 mocker.patch("meshadmin.server.assets.asset_path")
330 mocker.patch.object(Path, "exists", return_value=True)
332 # Test valid download for Linux (x86_64)
333 response = client.get("/api/v1/nebula/download/Linux/x86_64/nebula")
334 assert response.status_code == 200
335 assert response["Content-Type"] == "application/octet-stream"
336 assert response["Content-Disposition"] == 'attachment; filename="nebula"'
337 content = b"".join(response.streaming_content)
338 assert content == mock_content
340 # Test valid download for Linux (aarch64)
341 response = client.get("/api/v1/nebula/download/Linux/aarch64/nebula")
342 assert response.status_code == 200
344 # Test valid download for Darwin (arm64)
345 response = client.get("/api/v1/nebula/download/Darwin/arm64/nebula-cert")
346 assert response.status_code == 200
348 # Test invalid OS
349 response = client.get("/api/v1/nebula/download/Windows/x86_64/nebula")
350 assert response.status_code == 400
351 assert "Only Linux and Darwin are supported" in str(response.content)
353 # Test invalid architecture for Darwin
354 response = client.get("/api/v1/nebula/download/Darwin/x86_64/nebula")
355 assert response.status_code == 400
356 assert "Supported architectures for Darwin: ['arm64']" in str(response.content)
358 # Test invalid architecture for Linux
359 response = client.get("/api/v1/nebula/download/Linux/arm64/nebula")
360 assert response.status_code == 400
361 assert "Supported architectures for Linux: ['aarch64', 'x86_64']" in str(
362 response.content
363 )
365 # Test invalid binary name
366 response = client.get("/api/v1/nebula/download/Linux/x86_64/invalid")
367 assert response.status_code == 400
369 # Test binary not found
370 mocker.patch.object(Path, "exists", return_value=False)
371 response = client.get("/api/v1/nebula/download/Linux/x86_64/nebula")
372 assert response.status_code == 404
373 assert "Binary not found" in str(response.content)