Coverage for src / npm_mcp / client.py: 88%
202 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-21 05:57 +0400
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-21 05:57 +0400
1"""NPM API client with authentication handling."""
3import os
4from typing import Optional, List
5import httpx
6from .models import (
7 ProxyHost, Certificate, AccessList, RedirectionHost,
8 Stream, DeadHost, User, Setting, AuditLogEntry, NPMConfig,
9)
12class NPMClientError(Exception):
13 pass
16class NPMConfigError(NPMClientError):
17 pass
20class NPMAuthenticationError(NPMClientError):
21 pass
24class NPMNetworkError(NPMClientError):
25 pass
28class NPMClient:
29 def __init__(self, config: NPMConfig):
30 self.config = config
31 self.base_url = config.url.rstrip("/") if config.url else ""
32 self._token: Optional[str] = None
33 self._client = httpx.AsyncClient(timeout=30.0)
35 def _validate_config(self):
36 if not self.config.url: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 raise NPMConfigError("NPM_URL is required but not set")
38 if not self.config.email: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true
39 raise NPMConfigError("NPM_EMAIL is required but not set")
40 if not self.config.password: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true
41 raise NPMConfigError("NPM_PASSWORD is required but not set")
43 async def _get_token(self) -> str:
44 if self._token:
45 return self._token
47 self._validate_config()
49 try:
50 response = await self._client.post(
51 f"{self.base_url}/api/tokens",
52 json={
53 "identity": self.config.email,
54 "secret": self.config.password,
55 },
56 )
57 response.raise_for_status()
58 data = response.json()
59 self._token = data["token"]
60 return self._token
61 except httpx.HTTPStatusError as e:
62 if e.response.status_code == 401:
63 raise NPMAuthenticationError(
64 "Authentication failed: invalid NPM email or password"
65 ) from e
66 raise NPMNetworkError(f"HTTP error {e.response.status_code}: {e.response.text}") from e
67 except httpx.ConnectError as e:
68 raise NPMNetworkError(
69 f"Cannot connect to NPM at {self.base_url}. Check NPM_URL is correct and NPM is running."
70 ) from e
71 except httpx.TimeoutException as e:
72 raise NPMNetworkError(f"Request to NPM timed out after 30s") from e
73 except Exception as e:
74 raise NPMNetworkError(f"Unexpected error connecting to NPM: {e}") from e
76 async def _request(
77 self,
78 method: str,
79 path: str,
80 retry_auth: bool = True,
81 **kwargs,
82 ) -> httpx.Response:
83 try:
84 token = await self._get_token()
85 headers = kwargs.pop("headers", {})
86 headers["Authorization"] = f"Bearer {token}"
88 response = await self._client.request(
89 method,
90 f"{self.base_url}{path}",
91 headers=headers,
92 **kwargs,
93 )
95 if response.status_code == 401 and retry_auth:
96 self._token = None
97 return await self._request(method, path, retry_auth=False, **kwargs)
99 response.raise_for_status()
100 return response
101 except httpx.HTTPStatusError as e:
102 raise NPMNetworkError(
103 f"HTTP {e.response.status_code} error on {method} {path}: {e.response.text}"
104 ) from e
105 except httpx.ConnectError as e:
106 raise NPMNetworkError(
107 f"Cannot connect to NPM at {self.base_url}. Check network connectivity."
108 ) from e
109 except httpx.TimeoutException as e:
110 raise NPMNetworkError(f"Request to NPM timed out after 30s") from e
112 async def list_proxy_hosts(self) -> List[ProxyHost]:
113 response = await self._request("GET", "/api/nginx/proxy-hosts")
114 return [ProxyHost(**host) for host in response.json()]
116 async def get_proxy_host(self, host_id: int) -> ProxyHost:
117 response = await self._request("GET", f"/api/nginx/proxy-hosts/{host_id}")
118 return ProxyHost(**response.json())
120 async def create_proxy_host(self, host: ProxyHost) -> ProxyHost:
121 response = await self._request(
122 "POST",
123 "/api/nginx/proxy-hosts",
124 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
125 )
126 return ProxyHost(**response.json())
128 async def update_proxy_host(self, host_id: int, host: ProxyHost) -> ProxyHost:
129 response = await self._request(
130 "PUT",
131 f"/api/nginx/proxy-hosts/{host_id}",
132 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on", "owner_user_id"}),
133 )
134 return ProxyHost(**response.json())
136 async def delete_proxy_host(self, host_id: int) -> None:
137 await self._request("DELETE", f"/api/nginx/proxy-hosts/{host_id}")
139 async def enable_proxy_host(self, host_id: int) -> None:
140 await self._request("POST", f"/api/nginx/proxy-hosts/{host_id}/enable")
142 async def disable_proxy_host(self, host_id: int) -> None:
143 await self._request("POST", f"/api/nginx/proxy-hosts/{host_id}/disable")
145 async def list_certificates(self) -> List[Certificate]:
146 response = await self._request("GET", "/api/nginx/certificates")
147 return [Certificate(**cert) for cert in response.json()]
149 async def request_certificate(self, cert: Certificate) -> Certificate:
150 response = await self._request(
151 "POST",
152 "/api/nginx/certificates",
153 json=cert.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
154 )
155 return Certificate(**response.json())
157 async def get_certificate(self, cert_id: int) -> Certificate:
158 response = await self._request("GET", f"/api/nginx/certificates/{cert_id}")
159 return Certificate(**response.json())
161 async def delete_certificate(self, cert_id: int) -> None:
162 await self._request("DELETE", f"/api/nginx/certificates/{cert_id}")
164 async def renew_certificate(self, cert_id: int) -> Certificate:
165 response = await self._request("POST", f"/api/nginx/certificates/{cert_id}/renew")
166 return Certificate(**response.json())
168 async def list_dns_providers(self) -> list[dict]:
169 response = await self._request("GET", "/api/nginx/certificates/dns-providers")
170 return response.json()
172 async def test_http_challenge(self, domains: List[str]) -> dict:
173 response = await self._request(
174 "POST", "/api/nginx/certificates/test-http", json={"domains": domains}
175 )
176 return response.json()
178 async def list_access_lists(self) -> List[AccessList]:
179 response = await self._request("GET", "/api/nginx/access-lists")
180 return [AccessList(**al) for al in response.json()]
182 async def create_access_list(self, access_list: AccessList) -> AccessList:
183 response = await self._request(
184 "POST",
185 "/api/nginx/access-lists",
186 json=access_list.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
187 )
188 return AccessList(**response.json())
190 async def update_access_list(self, access_list_id: int, access_list: AccessList) -> AccessList:
191 response = await self._request(
192 "PUT",
193 f"/api/nginx/access-lists/{access_list_id}",
194 json=access_list.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
195 )
196 return AccessList(**response.json())
198 async def get_access_list(self, access_list_id: int) -> AccessList:
199 response = await self._request("GET", f"/api/nginx/access-lists/{access_list_id}")
200 return AccessList(**response.json())
202 async def delete_access_list(self, access_list_id: int) -> None:
203 await self._request("DELETE", f"/api/nginx/access-lists/{access_list_id}")
205 async def list_redirection_hosts(self) -> List[RedirectionHost]:
206 response = await self._request("GET", "/api/nginx/redirection-hosts")
207 return [RedirectionHost(**h) for h in response.json()]
209 async def get_redirection_host(self, host_id: int) -> RedirectionHost:
210 response = await self._request("GET", f"/api/nginx/redirection-hosts/{host_id}")
211 return RedirectionHost(**response.json())
213 async def create_redirection_host(self, host: RedirectionHost) -> RedirectionHost:
214 response = await self._request(
215 "POST", "/api/nginx/redirection-hosts",
216 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
217 )
218 return RedirectionHost(**response.json())
220 async def update_redirection_host(self, host_id: int, host: RedirectionHost) -> RedirectionHost:
221 response = await self._request(
222 "PUT", f"/api/nginx/redirection-hosts/{host_id}",
223 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on", "owner_user_id"}),
224 )
225 return RedirectionHost(**response.json())
227 async def delete_redirection_host(self, host_id: int) -> None:
228 await self._request("DELETE", f"/api/nginx/redirection-hosts/{host_id}")
230 async def enable_redirection_host(self, host_id: int) -> None:
231 await self._request("POST", f"/api/nginx/redirection-hosts/{host_id}/enable")
233 async def disable_redirection_host(self, host_id: int) -> None:
234 await self._request("POST", f"/api/nginx/redirection-hosts/{host_id}/disable")
236 async def list_streams(self) -> List[Stream]:
237 response = await self._request("GET", "/api/nginx/streams")
238 return [Stream(**s) for s in response.json()]
240 async def get_stream(self, stream_id: int) -> Stream:
241 response = await self._request("GET", f"/api/nginx/streams/{stream_id}")
242 return Stream(**response.json())
244 async def create_stream(self, stream: Stream) -> Stream:
245 response = await self._request(
246 "POST", "/api/nginx/streams",
247 json=stream.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
248 )
249 return Stream(**response.json())
251 async def update_stream(self, stream_id: int, stream: Stream) -> Stream:
252 response = await self._request(
253 "PUT", f"/api/nginx/streams/{stream_id}",
254 json=stream.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on", "owner_user_id"}),
255 )
256 return Stream(**response.json())
258 async def delete_stream(self, stream_id: int) -> None:
259 await self._request("DELETE", f"/api/nginx/streams/{stream_id}")
261 async def enable_stream(self, stream_id: int) -> None:
262 await self._request("POST", f"/api/nginx/streams/{stream_id}/enable")
264 async def disable_stream(self, stream_id: int) -> None:
265 await self._request("POST", f"/api/nginx/streams/{stream_id}/disable")
267 async def list_dead_hosts(self) -> List[DeadHost]:
268 response = await self._request("GET", "/api/nginx/dead-hosts")
269 return [DeadHost(**h) for h in response.json()]
271 async def get_dead_host(self, host_id: int) -> DeadHost:
272 response = await self._request("GET", f"/api/nginx/dead-hosts/{host_id}")
273 return DeadHost(**response.json())
275 async def create_dead_host(self, host: DeadHost) -> DeadHost:
276 response = await self._request(
277 "POST", "/api/nginx/dead-hosts",
278 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
279 )
280 return DeadHost(**response.json())
282 async def update_dead_host(self, host_id: int, host: DeadHost) -> DeadHost:
283 response = await self._request(
284 "PUT", f"/api/nginx/dead-hosts/{host_id}",
285 json=host.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on", "owner_user_id"}),
286 )
287 return DeadHost(**response.json())
289 async def delete_dead_host(self, host_id: int) -> None:
290 await self._request("DELETE", f"/api/nginx/dead-hosts/{host_id}")
292 async def enable_dead_host(self, host_id: int) -> None:
293 await self._request("POST", f"/api/nginx/dead-hosts/{host_id}/enable")
295 async def disable_dead_host(self, host_id: int) -> None:
296 await self._request("POST", f"/api/nginx/dead-hosts/{host_id}/disable")
298 async def list_users(self) -> List[User]:
299 response = await self._request("GET", "/api/users")
300 return [User(**u) for u in response.json()]
302 async def get_user(self, user_id: int) -> User:
303 response = await self._request("GET", f"/api/users/{user_id}")
304 return User(**response.json())
306 async def create_user(self, user: User) -> User:
307 response = await self._request(
308 "POST", "/api/users",
309 json=user.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
310 )
311 return User(**response.json())
313 async def update_user(self, user_id: int, user: User) -> User:
314 response = await self._request(
315 "PUT", f"/api/users/{user_id}",
316 json=user.model_dump(exclude_none=True, exclude={"id", "created_on", "modified_on"}),
317 )
318 return User(**response.json())
320 async def delete_user(self, user_id: int) -> None:
321 await self._request("DELETE", f"/api/users/{user_id}")
323 async def list_settings(self) -> List[Setting]:
324 response = await self._request("GET", "/api/settings")
325 return [Setting(**s) for s in response.json()]
327 async def get_setting(self, setting_id: str) -> Setting:
328 response = await self._request("GET", f"/api/settings/{setting_id}")
329 return Setting(**response.json())
331 async def update_setting(self, setting_id: str, setting: Setting) -> Setting:
332 response = await self._request(
333 "PUT", f"/api/settings/{setting_id}",
334 json=setting.model_dump(exclude_none=True, exclude={"id"}),
335 )
336 return Setting(**response.json())
338 async def list_audit_log(self) -> List[AuditLogEntry]:
339 response = await self._request("GET", "/api/audit-log")
340 return [AuditLogEntry(**e) for e in response.json()]
342 async def get_host_report(self) -> dict:
343 response = await self._request("GET", "/api/reports/hosts")
344 return response.json()
346 async def close(self):
347 await self._client.aclose()
350def create_client_from_env() -> NPMClient:
351 config = NPMConfig(
352 url=os.getenv("NPM_URL", ""),
353 email=os.getenv("NPM_EMAIL", ""),
354 password=os.getenv("NPM_PASSWORD", ""),
355 )
356 return NPMClient(config)