Coverage for src / npm_mcp / client.py: 97%
252 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-21 06:26 +0400
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-21 06:26 +0400
1"""NPM API client with authentication handling."""
3import os
4import re
5from typing import Optional, List
6import httpx
7from .models import (
8 ProxyHost, Certificate, AccessList, RedirectionHost,
9 Stream, DeadHost, User, Setting, AuditLogEntry, NPMConfig,
10)
13class NPMClientError(Exception):
14 pass
17class NPMConfigError(NPMClientError):
18 pass
21class NPMAuthenticationError(NPMClientError):
22 pass
25class NPMNetworkError(NPMClientError):
26 pass
29def _validate_int_id(value, name: str) -> int:
30 """Cast to int and validate positive. Prevents path traversal via string IDs."""
31 try:
32 int_val = int(value)
33 except (TypeError, ValueError):
34 raise ValueError(f"{name} must be a positive integer, got: {type(value).__name__}")
35 if int_val < 1: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 raise ValueError(f"{name} must be >= 1, got: {int_val}")
37 return int_val
40_SETTING_ID_PATTERN = re.compile(r"^[a-z0-9\-]{1,64}$")
43def _validate_setting_id(value) -> str:
44 """Validate setting_id is a safe alphanumeric-dash string."""
45 s = str(value)
46 if not _SETTING_ID_PATTERN.match(s): 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true
47 raise ValueError(f"setting_id must match [a-z0-9-]{{1,64}}, got: {s!r}")
48 return s
51class NPMClient:
52 def __init__(self, config: NPMConfig):
53 self.config = config
54 self.base_url = config.url.rstrip("/") if config.url else ""
55 self._token: Optional[str] = None
56 self._client = httpx.AsyncClient(timeout=30.0, follow_redirects=False)
57 self._validate_config()
59 def _validate_config(self):
60 if not self.config.url:
61 raise NPMConfigError("NPM_URL is required but not set")
62 if not self.config.email:
63 raise NPMConfigError("NPM_EMAIL is required but not set")
64 if not self.config.password.get_secret_value():
65 raise NPMConfigError("NPM_PASSWORD is required but not set")
67 async def _get_token(self) -> str:
68 if self._token:
69 return self._token
71 try:
72 response = await self._client.post(
73 f"{self.base_url}/api/tokens",
74 json={
75 "identity": self.config.email,
76 "secret": self.config.password.get_secret_value(),
77 },
78 )
79 response.raise_for_status()
80 data = response.json()
81 self._token = data["token"]
82 return self._token
83 except httpx.HTTPStatusError as e:
84 if e.response.status_code == 401:
85 raise NPMAuthenticationError(
86 "Authentication failed: invalid NPM email or password"
87 ) from e
88 raise NPMNetworkError(f"HTTP error {e.response.status_code}") from e
89 except httpx.ConnectError as e:
90 raise NPMNetworkError("Cannot connect to NPM. Check NPM_URL and that NPM is running.") from e
91 except httpx.TimeoutException as e:
92 raise NPMNetworkError("Request to NPM timed out") from e
93 except NPMClientError:
94 raise
95 except Exception as e:
96 raise NPMNetworkError(f"Unexpected error: {type(e).__name__}") from e
98 async def _request(
99 self,
100 method: str,
101 path: str,
102 retry_auth: bool = True,
103 **kwargs,
104 ) -> httpx.Response:
105 try:
106 token = await self._get_token()
107 headers = kwargs.pop("headers", {})
108 headers["Authorization"] = f"Bearer {token}"
110 response = await self._client.request(
111 method,
112 f"{self.base_url}{path}",
113 headers=headers,
114 **kwargs,
115 )
117 if response.status_code == 401 and retry_auth:
118 self._token = None
119 return await self._request(method, path, retry_auth=False, **kwargs)
121 response.raise_for_status()
122 return response
123 except NPMClientError:
124 raise
125 except httpx.HTTPStatusError as e:
126 raise NPMNetworkError(
127 f"HTTP {e.response.status_code} error on {method} {path}"
128 ) from e
129 except httpx.ConnectError as e:
130 raise NPMNetworkError("Cannot connect to NPM. Check network connectivity.") from e
131 except httpx.TimeoutException as e:
132 raise NPMNetworkError("Request to NPM timed out") from e
134 async def list_proxy_hosts(self) -> List[ProxyHost]:
135 response = await self._request("GET", "/api/nginx/proxy-hosts")
136 return [ProxyHost(**host) for host in response.json()]
138 async def get_proxy_host(self, host_id: int) -> ProxyHost:
139 host_id = _validate_int_id(host_id, "host_id")
140 response = await self._request("GET", f"/api/nginx/proxy-hosts/{host_id}")
141 return ProxyHost(**response.json())
143 async def create_proxy_host(self, host: ProxyHost) -> ProxyHost:
144 response = await self._request(
145 "POST",
146 "/api/nginx/proxy-hosts",
147 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
148 )
149 return ProxyHost(**response.json())
151 async def update_proxy_host(self, host_id: int, host: ProxyHost) -> ProxyHost:
152 host_id = _validate_int_id(host_id, "host_id")
153 response = await self._request(
154 "PUT",
155 f"/api/nginx/proxy-hosts/{host_id}",
156 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on", "owner_user_id"}),
157 )
158 return ProxyHost(**response.json())
160 async def delete_proxy_host(self, host_id: int) -> None:
161 host_id = _validate_int_id(host_id, "host_id")
162 await self._request("DELETE", f"/api/nginx/proxy-hosts/{host_id}")
164 async def enable_proxy_host(self, host_id: int) -> None:
165 host_id = _validate_int_id(host_id, "host_id")
166 await self._request("POST", f"/api/nginx/proxy-hosts/{host_id}/enable")
168 async def disable_proxy_host(self, host_id: int) -> None:
169 host_id = _validate_int_id(host_id, "host_id")
170 await self._request("POST", f"/api/nginx/proxy-hosts/{host_id}/disable")
172 async def list_certificates(self) -> List[Certificate]:
173 response = await self._request("GET", "/api/nginx/certificates")
174 return [Certificate(**cert) for cert in response.json()]
176 async def request_certificate(self, cert: Certificate) -> Certificate:
177 response = await self._request(
178 "POST",
179 "/api/nginx/certificates",
180 json=cert.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
181 )
182 return Certificate(**response.json())
184 async def get_certificate(self, cert_id: int) -> Certificate:
185 cert_id = _validate_int_id(cert_id, "certificate_id")
186 response = await self._request("GET", f"/api/nginx/certificates/{cert_id}")
187 return Certificate(**response.json())
189 async def delete_certificate(self, cert_id: int) -> None:
190 cert_id = _validate_int_id(cert_id, "certificate_id")
191 await self._request("DELETE", f"/api/nginx/certificates/{cert_id}")
193 async def renew_certificate(self, cert_id: int) -> Certificate:
194 cert_id = _validate_int_id(cert_id, "certificate_id")
195 response = await self._request("POST", f"/api/nginx/certificates/{cert_id}/renew")
196 return Certificate(**response.json())
198 async def list_dns_providers(self) -> list[dict]:
199 response = await self._request("GET", "/api/nginx/certificates/dns-providers")
200 return response.json()
202 async def test_http_challenge(self, domains: List[str]) -> dict:
203 response = await self._request(
204 "POST", "/api/nginx/certificates/test-http", json={"domains": domains}
205 )
206 return response.json()
208 async def list_access_lists(self) -> List[AccessList]:
209 response = await self._request("GET", "/api/nginx/access-lists")
210 return [AccessList(**al) for al in response.json()]
212 async def create_access_list(self, access_list: AccessList) -> AccessList:
213 response = await self._request(
214 "POST",
215 "/api/nginx/access-lists",
216 json=access_list.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
217 )
218 return AccessList(**response.json())
220 async def update_access_list(self, access_list_id: int, access_list: AccessList) -> AccessList:
221 access_list_id = _validate_int_id(access_list_id, "access_list_id")
222 response = await self._request(
223 "PUT",
224 f"/api/nginx/access-lists/{access_list_id}",
225 json=access_list.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
226 )
227 return AccessList(**response.json())
229 async def get_access_list(self, access_list_id: int) -> AccessList:
230 access_list_id = _validate_int_id(access_list_id, "access_list_id")
231 response = await self._request("GET", f"/api/nginx/access-lists/{access_list_id}")
232 return AccessList(**response.json())
234 async def delete_access_list(self, access_list_id: int) -> None:
235 access_list_id = _validate_int_id(access_list_id, "access_list_id")
236 await self._request("DELETE", f"/api/nginx/access-lists/{access_list_id}")
238 async def list_redirection_hosts(self) -> List[RedirectionHost]:
239 response = await self._request("GET", "/api/nginx/redirection-hosts")
240 return [RedirectionHost(**h) for h in response.json()]
242 async def get_redirection_host(self, host_id: int) -> RedirectionHost:
243 host_id = _validate_int_id(host_id, "host_id")
244 response = await self._request("GET", f"/api/nginx/redirection-hosts/{host_id}")
245 return RedirectionHost(**response.json())
247 async def create_redirection_host(self, host: RedirectionHost) -> RedirectionHost:
248 response = await self._request(
249 "POST", "/api/nginx/redirection-hosts",
250 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
251 )
252 return RedirectionHost(**response.json())
254 async def update_redirection_host(self, host_id: int, host: RedirectionHost) -> RedirectionHost:
255 host_id = _validate_int_id(host_id, "host_id")
256 response = await self._request(
257 "PUT", f"/api/nginx/redirection-hosts/{host_id}",
258 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on", "owner_user_id"}),
259 )
260 return RedirectionHost(**response.json())
262 async def delete_redirection_host(self, host_id: int) -> None:
263 host_id = _validate_int_id(host_id, "host_id")
264 await self._request("DELETE", f"/api/nginx/redirection-hosts/{host_id}")
266 async def enable_redirection_host(self, host_id: int) -> None:
267 host_id = _validate_int_id(host_id, "host_id")
268 await self._request("POST", f"/api/nginx/redirection-hosts/{host_id}/enable")
270 async def disable_redirection_host(self, host_id: int) -> None:
271 host_id = _validate_int_id(host_id, "host_id")
272 await self._request("POST", f"/api/nginx/redirection-hosts/{host_id}/disable")
274 async def list_streams(self) -> List[Stream]:
275 response = await self._request("GET", "/api/nginx/streams")
276 return [Stream(**s) for s in response.json()]
278 async def get_stream(self, stream_id: int) -> Stream:
279 stream_id = _validate_int_id(stream_id, "stream_id")
280 response = await self._request("GET", f"/api/nginx/streams/{stream_id}")
281 return Stream(**response.json())
283 async def create_stream(self, stream: Stream) -> Stream:
284 response = await self._request(
285 "POST", "/api/nginx/streams",
286 json=stream.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
287 )
288 return Stream(**response.json())
290 async def update_stream(self, stream_id: int, stream: Stream) -> Stream:
291 stream_id = _validate_int_id(stream_id, "stream_id")
292 response = await self._request(
293 "PUT", f"/api/nginx/streams/{stream_id}",
294 json=stream.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on", "owner_user_id"}),
295 )
296 return Stream(**response.json())
298 async def delete_stream(self, stream_id: int) -> None:
299 stream_id = _validate_int_id(stream_id, "stream_id")
300 await self._request("DELETE", f"/api/nginx/streams/{stream_id}")
302 async def enable_stream(self, stream_id: int) -> None:
303 stream_id = _validate_int_id(stream_id, "stream_id")
304 await self._request("POST", f"/api/nginx/streams/{stream_id}/enable")
306 async def disable_stream(self, stream_id: int) -> None:
307 stream_id = _validate_int_id(stream_id, "stream_id")
308 await self._request("POST", f"/api/nginx/streams/{stream_id}/disable")
310 async def list_dead_hosts(self) -> List[DeadHost]:
311 response = await self._request("GET", "/api/nginx/dead-hosts")
312 return [DeadHost(**h) for h in response.json()]
314 async def get_dead_host(self, host_id: int) -> DeadHost:
315 host_id = _validate_int_id(host_id, "host_id")
316 response = await self._request("GET", f"/api/nginx/dead-hosts/{host_id}")
317 return DeadHost(**response.json())
319 async def create_dead_host(self, host: DeadHost) -> DeadHost:
320 response = await self._request(
321 "POST", "/api/nginx/dead-hosts",
322 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
323 )
324 return DeadHost(**response.json())
326 async def update_dead_host(self, host_id: int, host: DeadHost) -> DeadHost:
327 host_id = _validate_int_id(host_id, "host_id")
328 response = await self._request(
329 "PUT", f"/api/nginx/dead-hosts/{host_id}",
330 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on", "owner_user_id"}),
331 )
332 return DeadHost(**response.json())
334 async def delete_dead_host(self, host_id: int) -> None:
335 host_id = _validate_int_id(host_id, "host_id")
336 await self._request("DELETE", f"/api/nginx/dead-hosts/{host_id}")
338 async def enable_dead_host(self, host_id: int) -> None:
339 host_id = _validate_int_id(host_id, "host_id")
340 await self._request("POST", f"/api/nginx/dead-hosts/{host_id}/enable")
342 async def disable_dead_host(self, host_id: int) -> None:
343 host_id = _validate_int_id(host_id, "host_id")
344 await self._request("POST", f"/api/nginx/dead-hosts/{host_id}/disable")
346 async def list_users(self) -> List[User]:
347 response = await self._request("GET", "/api/users")
348 return [User(**u) for u in response.json()]
350 async def get_user(self, user_id: int) -> User:
351 user_id = _validate_int_id(user_id, "user_id")
352 response = await self._request("GET", f"/api/users/{user_id}")
353 return User(**response.json())
355 async def create_user(self, user: User) -> User:
356 response = await self._request(
357 "POST", "/api/users",
358 json=user.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
359 )
360 return User(**response.json())
362 async def update_user(self, user_id: int, user: User) -> User:
363 user_id = _validate_int_id(user_id, "user_id")
364 response = await self._request(
365 "PUT", f"/api/users/{user_id}",
366 json=user.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
367 )
368 return User(**response.json())
370 async def delete_user(self, user_id: int) -> None:
371 user_id = _validate_int_id(user_id, "user_id")
372 await self._request("DELETE", f"/api/users/{user_id}")
374 async def list_settings(self) -> List[Setting]:
375 response = await self._request("GET", "/api/settings")
376 return [Setting(**s) for s in response.json()]
378 async def get_setting(self, setting_id: str) -> Setting:
379 setting_id = _validate_setting_id(setting_id)
380 response = await self._request("GET", f"/api/settings/{setting_id}")
381 return Setting(**response.json())
383 async def update_setting(self, setting_id: str, setting: Setting) -> Setting:
384 setting_id = _validate_setting_id(setting_id)
385 response = await self._request(
386 "PUT", f"/api/settings/{setting_id}",
387 json=setting.model_dump(exclude_none=True, exclude={"id"}),
388 )
389 return Setting(**response.json())
391 async def list_audit_log(self) -> List[AuditLogEntry]:
392 response = await self._request("GET", "/api/audit-log")
393 return [AuditLogEntry(**e) for e in response.json()]
395 async def get_host_report(self) -> dict:
396 response = await self._request("GET", "/api/reports/hosts")
397 return response.json()
399 async def close(self):
400 await self._client.aclose()
403def create_client_from_env() -> NPMClient:
404 config = NPMConfig(
405 url=os.getenv("NPM_URL", ""),
406 email=os.getenv("NPM_EMAIL", ""),
407 password=os.getenv("NPM_PASSWORD", ""),
408 )
409 return NPMClient(config)