Coverage for src/meshadmin/server/networks/tests/test_api.py: 100%

150 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-04 09:21 +0200

1import json 

2from datetime import timedelta 

3from pathlib import Path 

4 

5import pytest 

6from django.utils import timezone 

7from jwcrypto.jwk import JWK 

8from jwcrypto.jwt import JWT 

9 

10from meshadmin.common.utils import create_keys 

11from meshadmin.server.networks import schemas 

12from meshadmin.server.networks.models import Host, Template 

13from meshadmin.server.networks.services import ( 

14 create_template, 

15 generate_enrollment_token, 

16) 

17 

18 

19@pytest.fixture 

20def keycloak_key(): 

21 key = JWK.generate(kty="RSA", size=2048, kid="test-key-id") 

22 return { 

23 "private_key": key, 

24 "public_key": json.loads(key.export_public()), 

25 "kid": "test-key-id", 

26 } 

27 

28 

29@pytest.fixture 

30def keycloak_auth_headers(mocker, keycloak_key, settings): 

31 jwt_token = JWT( 

32 header={"alg": "RS256", "kid": keycloak_key["kid"]}, 

33 claims={ 

34 "exp": 9999999999, 

35 "iat": 1741328648, 

36 "iss": settings.KEYCLOAK_ISSUER, 

37 "azp": settings.KEYCLOAK_ADMIN_CLIENT, 

38 "typ": "Bearer", 

39 }, 

40 ) 

41 jwt_token.make_signed_token(keycloak_key["private_key"]) 

42 mock_get = mocker.patch("requests.get") 

43 mock_get.return_value.json.return_value = {"keys": [keycloak_key["public_key"]]} 

44 mock_get.return_value.raise_for_status.return_value = None 

45 

46 mocker.patch( 

47 "meshadmin.server.networks.api.KeycloakAuthBearer.get_keycloak_public_key", 

48 return_value={"keys": [keycloak_key["public_key"]]}, 

49 ) 

50 return {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"} 

51 

52 

53def test_template_endpoints(db, client, test_network, keycloak_auth_headers): 

54 network = test_network(name="test_network", cidr="100.100.64.0/24") 

55 template_data = schemas.TemplateCreate( 

56 name="test_template", 

57 network_name=network.name, 

58 is_lighthouse=True, 

59 is_relay=True, 

60 use_relay=False, 

61 ) 

62 response = client.post( 

63 "/api/v1/templates", 

64 data=template_data.model_dump_json(), 

65 content_type="application/json", 

66 **keycloak_auth_headers, 

67 ) 

68 assert response.status_code == 200 

69 response_data = response.json() 

70 assert response_data["name"] == "test_template" 

71 assert "enrollment_key" in response_data 

72 

73 # Delete template 

74 response = client.delete( 

75 "/api/v1/templates/test_template", 

76 **keycloak_auth_headers, 

77 ) 

78 assert response.status_code == 200 

79 assert not Template.objects.filter(name="test_template").exists() 

80 

81 

82def test_host_endpoints(client, test_network, keycloak_auth_headers): 

83 network = test_network(name="test_network", cidr="100.100.64.0/24") 

84 # First create and enroll a host 

85 template = create_template( 

86 "host_template", 

87 network.name, 

88 is_lighthouse=False, 

89 is_relay=False, 

90 ) 

91 token = generate_enrollment_token(template) 

92 auth_key = JWK.generate(kty="RSA", size=2048) 

93 _, public_net_key = create_keys() 

94 enrollment_data = schemas.ClientEnrollment( 

95 enrollment_key=token, 

96 public_net_key=public_net_key, 

97 public_auth_key=auth_key.export_public(), 

98 preferred_hostname="test-host", 

99 public_ip="127.0.0.1", 

100 enroll_on_existence=False, 

101 ) 

102 response = client.post( 

103 "/api/v1/enroll", 

104 data=enrollment_data.model_dump_json(), 

105 content_type="application/json", 

106 ) 

107 assert response.status_code == 200 

108 

109 # Delete host 

110 response = client.delete( 

111 "/api/v1/hosts/test-host", 

112 **keycloak_auth_headers, 

113 ) 

114 assert response.status_code == 200 

115 assert not Host.objects.filter(name="test-host").exists() 

116 

117 

118def test_unauthorized_access(db, client): 

119 endpoints = [ 

120 ("POST", "/api/v1/networks"), 

121 ("GET", "/api/v1/networks"), 

122 ("DELETE", "/api/v1/networks/test"), 

123 ("POST", "/api/v1/templates"), 

124 ("DELETE", "/api/v1/templates/test"), 

125 ("DELETE", "/api/v1/hosts/test"), 

126 ] 

127 

128 for method, endpoint in endpoints: 

129 if method == "POST": 

130 response = client.post( 

131 endpoint, 

132 data="{}", 

133 content_type="application/json", 

134 ) 

135 else: 

136 response = ( 

137 client.get(endpoint) if method == "GET" else client.delete(endpoint) 

138 ) 

139 assert response.status_code == 401, f"{method} {endpoint} should require auth" 

140 

141 

142def test_wrong_client_id(db, client, keycloak_key, settings): 

143 jwt_token = JWT( 

144 header={"alg": "RS256", "kid": keycloak_key["kid"]}, 

145 claims={ 

146 "exp": 9999999999, 

147 "iat": 1741328648, 

148 "iss": settings.KEYCLOAK_ISSUER, 

149 "azp": "wrong-client", # Not admin-cli 

150 "typ": "Bearer", 

151 }, 

152 ) 

153 jwt_token.make_signed_token(keycloak_key["private_key"]) 

154 headers = {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"} 

155 response = client.get("/api/v1/networks", **headers) 

156 assert response.status_code == 401 

157 

158 

159def test_wrong_signature(db, client, keycloak_key, mocker): 

160 different_key = JWK.generate(kty="RSA", size=2048) 

161 jwt_token = JWT( 

162 header={"alg": "RS256", "kid": keycloak_key["kid"]}, 

163 claims={ 

164 "exp": 9999999999, 

165 "iat": 1741328648, 

166 "iss": "http://localhost:8080/realms/meshadmin", 

167 "azp": "admin-cli", 

168 "typ": "Bearer", 

169 }, 

170 ) 

171 jwt_token.make_signed_token(different_key) 

172 mock_response = mocker.Mock() 

173 mock_response.json.return_value = {"keys": [keycloak_key["public_key"]]} 

174 mock_response.raise_for_status.return_value = None 

175 mocker.patch("requests.get", return_value=mock_response) 

176 headers = {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"} 

177 response = client.get("/api/v1/networks", **headers) 

178 assert response.status_code == 401 

179 

180 

181def test_get_config(db, client, test_network): 

182 network = test_network(name="test_network", cidr="10.0.0.0/24") 

183 auth_key = JWK.generate(kty="RSA", size=2048) 

184 public_auth_key = auth_key.export_public() 

185 _, public_key = create_keys() 

186 

187 host = Host.objects.create( 

188 network=network, 

189 name="test-host", 

190 assigned_ip="10.0.0.1", 

191 public_key=public_key, 

192 public_auth_key=public_auth_key, 

193 public_auth_kid=auth_key.thumbprint(), 

194 ) 

195 jwt_token = JWT( 

196 header={"alg": "RS256", "kid": auth_key.thumbprint()}, 

197 claims={ 

198 "exp": 9999999999, 

199 "kid": auth_key.thumbprint(), 

200 }, 

201 ) 

202 jwt_token.make_signed_token(auth_key) 

203 host_auth_headers = {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"} 

204 response = client.get( 

205 "/api/v1/config", 

206 **host_auth_headers, 

207 ) 

208 

209 assert response.status_code == 200 

210 assert response["Content-Type"] == "text/yaml" 

211 host.refresh_from_db() 

212 assert host.last_config_refresh is not None 

213 assert host.last_config_refresh > timezone.now() - timedelta(minutes=1) 

214 

215 

216def test_cleanup_ephemeral_hosts(db, client, test_network): 

217 network = test_network(name="test_network", cidr="10.0.0.0/24") 

218 auth_key = JWK.generate(kty="RSA", size=2048) 

219 public_auth_key = auth_key.export_public() 

220 _, public_key = create_keys() 

221 

222 Host.objects.create( 

223 network=network, 

224 name="requesting-host", 

225 assigned_ip="10.0.0.10", 

226 public_key=public_key, 

227 public_auth_key=public_auth_key, 

228 public_auth_kid=auth_key.thumbprint(), 

229 ) 

230 jwt_token = JWT( 

231 header={"alg": "RS256", "kid": auth_key.thumbprint()}, 

232 claims={ 

233 "exp": 9999999999, 

234 "kid": auth_key.thumbprint(), 

235 }, 

236 ) 

237 jwt_token.make_signed_token(auth_key) 

238 host_auth_headers = {"HTTP_AUTHORIZATION": f"Bearer {jwt_token.serialize()}"} 

239 

240 # Create some ephemeral hosts 

241 # 1. A stale host (last_config_refresh > 10 minutes ago) 

242 stale_host = Host.objects.create( 

243 network=network, 

244 name="stale-host", 

245 assigned_ip="10.0.0.1", 

246 public_key="test-key-1", 

247 public_auth_key="{}", 

248 public_auth_kid="test-kid-1", 

249 is_ephemeral=True, 

250 last_config_refresh=timezone.now() - timedelta(minutes=15), 

251 ) 

252 

253 # 2. A recently seen ephemeral host (should not be removed) 

254 recent_host = Host.objects.create( 

255 network=network, 

256 name="recent-host", 

257 assigned_ip="10.0.0.2", 

258 public_key="test-key-2", 

259 public_auth_key="{}", 

260 public_auth_kid="test-kid-2", 

261 is_ephemeral=True, 

262 last_config_refresh=timezone.now() - timedelta(minutes=5), 

263 ) 

264 

265 # 3. A non-ephemeral host (should not be removed regardless of last_config_refresh) 

266 non_ephemeral_host = Host.objects.create( 

267 network=network, 

268 name="non-ephemeral-host", 

269 assigned_ip="10.0.0.3", 

270 public_key="test-key-3", 

271 public_auth_key="{}", 

272 public_auth_kid="test-kid-3", 

273 is_ephemeral=False, 

274 last_config_refresh=timezone.now() - timedelta(minutes=30), 

275 ) 

276 

277 response = client.post( 

278 "/api/v1/cleanup-ephemeral", 

279 **host_auth_headers, 

280 ) 

281 

282 assert response.status_code == 200 

283 result = response.json() 

284 assert result["removed_count"] == 1 

285 assert not Host.objects.filter(id=stale_host.id).exists() 

286 assert Host.objects.filter(id=recent_host.id).exists() 

287 assert Host.objects.filter(id=non_ephemeral_host.id).exists() 

288 

289 # Call the endpoint again - should remove no hosts 

290 response = client.post( 

291 "/api/v1/cleanup-ephemeral", 

292 **host_auth_headers, 

293 ) 

294 assert response.status_code == 200 

295 result = response.json() 

296 assert result["removed_count"] == 0 

297 

298 

299def test_enrollment_api_with_jwt(test_network, client): 

300 network = test_network(name="testnet", cidr="10.0.0.0/24") 

301 template = create_template( 

302 "jwt_template", 

303 network.name, 

304 ) 

305 token = generate_enrollment_token(template) 

306 auth_key = JWK.generate(kty="RSA", size=2048) 

307 _, public_net_key = create_keys() 

308 enrollment_data = schemas.ClientEnrollment( 

309 enrollment_key=token, 

310 public_net_key=public_net_key, 

311 public_auth_key=auth_key.export_public(), 

312 preferred_hostname="test-host", 

313 public_ip="127.0.0.1", 

314 enroll_on_existence=False, 

315 ) 

316 response = client.post( 

317 "/api/v1/enroll", 

318 data=enrollment_data.model_dump_json(), 

319 content_type="application/json", 

320 ) 

321 assert response.status_code == 200 

322 assert Host.objects.filter(name="test-host").exists() 

323 

324 

325def test_download_nebula_binary(client, mocker): 

326 mock_content = b"mock binary content" 

327 mock_file = mocker.mock_open(read_data=mock_content) 

328 mocker.patch("builtins.open", mock_file) 

329 mocker.patch("meshadmin.server.assets.asset_path") 

330 mocker.patch.object(Path, "exists", return_value=True) 

331 

332 # Test valid download for Linux (x86_64) 

333 response = client.get("/api/v1/nebula/download/Linux/x86_64/nebula") 

334 assert response.status_code == 200 

335 assert response["Content-Type"] == "application/octet-stream" 

336 assert response["Content-Disposition"] == 'attachment; filename="nebula"' 

337 content = b"".join(response.streaming_content) 

338 assert content == mock_content 

339 

340 # Test valid download for Linux (aarch64) 

341 response = client.get("/api/v1/nebula/download/Linux/aarch64/nebula") 

342 assert response.status_code == 200 

343 

344 # Test valid download for Darwin (arm64) 

345 response = client.get("/api/v1/nebula/download/Darwin/arm64/nebula-cert") 

346 assert response.status_code == 200 

347 

348 # Test invalid OS 

349 response = client.get("/api/v1/nebula/download/Windows/x86_64/nebula") 

350 assert response.status_code == 400 

351 assert "Only Linux and Darwin are supported" in str(response.content) 

352 

353 # Test invalid architecture for Darwin 

354 response = client.get("/api/v1/nebula/download/Darwin/x86_64/nebula") 

355 assert response.status_code == 400 

356 assert "Supported architectures for Darwin: ['arm64']" in str(response.content) 

357 

358 # Test invalid architecture for Linux 

359 response = client.get("/api/v1/nebula/download/Linux/arm64/nebula") 

360 assert response.status_code == 400 

361 assert "Supported architectures for Linux: ['aarch64', 'x86_64']" in str( 

362 response.content 

363 ) 

364 

365 # Test invalid binary name 

366 response = client.get("/api/v1/nebula/download/Linux/x86_64/invalid") 

367 assert response.status_code == 400 

368 

369 # Test binary not found 

370 mocker.patch.object(Path, "exists", return_value=False) 

371 response = client.get("/api/v1/nebula/download/Linux/x86_64/nebula") 

372 assert response.status_code == 404 

373 assert "Binary not found" in str(response.content)