ctfy.sdk.node_client
HTTP client for platform → node machine calls.
Every call is authenticated with the target node's bearer token —
minted per node at registration and used in both directions. End users
never touch this client; they talk to the platform, which proxies to
the node. Node endpoints are defined in
server/routes/node_instances.py.
1"""HTTP client for platform → node machine calls. 2 3Every call is authenticated with the target node's bearer token — 4minted per node at registration and used in both directions. End users 5never touch this client; they talk to the platform, which proxies to 6the node. Node endpoints are defined in 7``server/routes/node_instances.py``. 8""" 9 10from __future__ import annotations 11 12import json 13from typing import Any 14 15import httpx 16 17from ctfy.core.constants import DEFAULT_CLIENT_TIMEOUT 18from ctfy.core.exceptions import NodeRequestError 19from ctfy.sdk.base import BaseHttpClient 20 21 22def _raise_for_node_status(resp: httpx.Response) -> None: 23 """Like ``resp.raise_for_status()`` but preserves the node's error body. 24 25 On a 4xx/5xx, pull the FastAPI ``{"detail": ...}`` field (or the raw 26 body as a fallback) and raise :class:`NodeRequestError` so the real 27 failure reaches the caller instead of httpx's generic status string. 28 """ 29 if not resp.is_error: 30 return 31 detail = "" 32 try: 33 body = resp.json() 34 except (json.JSONDecodeError, ValueError): 35 body = None 36 if isinstance(body, dict): 37 d = body.get("detail") 38 if isinstance(d, str): 39 detail = d 40 elif d is not None: 41 detail = json.dumps(d) 42 if not detail: 43 detail = (resp.text or "").strip() 44 raise NodeRequestError(resp.status_code, detail) 45 46 47class NodeClient(BaseHttpClient): 48 """Thin sync HTTP client; one instance per node URL.""" 49 50 def __init__( 51 self, 52 node_url: str, 53 token: str, 54 *, 55 timeout: int = DEFAULT_CLIENT_TIMEOUT, 56 ) -> None: 57 super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout) 58 59 # -- lifecycle ---------------------------------------------------------- 60 61 def start_instance( 62 self, 63 *, 64 challenge_id: str, 65 instance_id: str, 66 ttl: int, 67 answers: dict[str, str], 68 proxy_output_dir: str | None = None, 69 ) -> dict[str, Any]: 70 resp = self.request( 71 "POST", 72 "/instances", 73 json={ 74 "challenge_id": challenge_id, 75 "instance_id": instance_id, 76 "ttl": ttl, 77 "answers": answers, 78 "proxy_output_dir": proxy_output_dir, 79 }, 80 ) 81 _raise_for_node_status(resp) 82 body: dict[str, Any] = resp.json() 83 return body 84 85 def stop_instance(self, instance_id: str) -> None: 86 resp = self.request("DELETE", f"/instances/{instance_id}") 87 resp.raise_for_status() 88 89 def stop_all(self) -> None: 90 resp = self.request("POST", "/admin/stop-all") 91 resp.raise_for_status() 92 93 def rescan_challenges(self) -> dict[str, Any]: 94 """Tell the node to drop its spec cache and re-scan challenges_dir. 95 96 Returns ``{total, added, removed}`` so the platform can report 97 the per-node outcome of a cluster-wide rescan.""" 98 resp = self.request("POST", "/admin/rescan-challenges") 99 resp.raise_for_status() 100 body: dict[str, Any] = resp.json() 101 return body 102 103 # -- admin pre-build (image cache warming) ------------------------------ 104 105 def build_challenge(self, challenge_id: str) -> dict[str, Any]: 106 """Ask the node to pre-build images for *challenge_id*. 107 108 Fires-and-returns: the node persists ``status="building"`` and 109 spawns a daemon thread; the body returned here is that initial 110 state row. Polling :meth:`get_build_state` is how the platform 111 learns when it lands on ``built`` / ``failed``. 112 """ 113 resp = self.request("POST", f"/admin/challenges/{challenge_id}/build") 114 _raise_for_node_status(resp) 115 body: dict[str, Any] = resp.json() 116 return body 117 118 def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 119 """Queue background pre-build on the node. 120 121 With ``challenge_ids=None`` the node builds every spec it knows. 122 With an explicit list (the platform scoping a bulk build to a 123 competition's challenge set) only those are built. Returns 124 ``{queued, skipped_built, skipped_in_progress}`` so the platform 125 can report per-node what got picked up. Sequential on the node 126 side — no fan-out across the corpus. 127 """ 128 kwargs: dict[str, Any] = {} 129 if challenge_ids is not None: 130 kwargs["json"] = {"challenge_ids": challenge_ids} 131 resp = self.request("POST", "/admin/challenges/build-all", **kwargs) 132 _raise_for_node_status(resp) 133 body: dict[str, Any] = resp.json() 134 return body 135 136 def get_build_state(self) -> dict[str, Any]: 137 """Fetch every per-challenge build-state row on this node. 138 139 Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``. 140 ``status`` is one of ``unbuilt`` / ``building`` / ``built`` / 141 ``failed``; ``unbuilt`` placeholders are synthesised for specs 142 the node has seen but never been asked to build. 143 """ 144 resp = self.request("GET", "/admin/challenges/build-state") 145 _raise_for_node_status(resp) 146 body: dict[str, Any] = resp.json() 147 return body 148 149 def pull_challenge(self, challenge_id: str) -> dict[str, Any]: 150 """Ask the node to pre-pull registry images for *challenge_id*. 151 152 Pull-side twin of :meth:`build_challenge`: the node persists 153 ``status="pulling"`` and spawns a daemon thread; poll 154 :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing. 155 """ 156 resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull") 157 _raise_for_node_status(resp) 158 body: dict[str, Any] = resp.json() 159 return body 160 161 def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 162 """Queue background pre-pull on the node. 163 164 With ``challenge_ids=None`` the node pulls every spec it knows; 165 with an explicit list only those (the platform scoping to a 166 competition). Returns ``{queued, skipped_pulled, 167 skipped_in_progress}``. 168 """ 169 kwargs: dict[str, Any] = {} 170 if challenge_ids is not None: 171 kwargs["json"] = {"challenge_ids": challenge_ids} 172 resp = self.request("POST", "/admin/challenges/pull-all", **kwargs) 173 _raise_for_node_status(resp) 174 body: dict[str, Any] = resp.json() 175 return body 176 177 def get_pull_state(self) -> dict[str, Any]: 178 """Fetch every per-challenge pull-state row on this node. 179 180 Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``. 181 ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` / 182 ``failed``; ``unpulled`` placeholders are synthesised for specs 183 the node has seen but never been asked to pull. 184 """ 185 resp = self.request("GET", "/admin/challenges/pull-state") 186 _raise_for_node_status(resp) 187 body: dict[str, Any] = resp.json() 188 return body 189 190 # -- status / health ---------------------------------------------------- 191 192 def get_status(self, instance_id: str) -> dict[str, Any]: 193 resp = self.request("GET", f"/instances/{instance_id}/status") 194 resp.raise_for_status() 195 body: dict[str, Any] = resp.json() 196 return body 197 198 def check_health(self, instance_id: str) -> bool: 199 resp = self.request("GET", f"/instances/{instance_id}/health") 200 resp.raise_for_status() 201 return bool(resp.json().get("is_healthy")) 202 203 def node_health(self) -> dict[str, Any]: 204 """Liveness + ``{running, capacity}`` for heartbeat.""" 205 resp = self.request("GET", "/health") 206 resp.raise_for_status() 207 body: dict[str, Any] = resp.json() 208 return body 209 210 # -- traffic / runtime -------------------------------------------------- 211 212 def get_traffic(self, instance_id: str) -> dict[str, Any]: 213 """Fetch mitmproxy flow data; file lives on the node's FS.""" 214 resp = self.request("GET", f"/instances/{instance_id}/traffic") 215 resp.raise_for_status() 216 body: dict[str, Any] = resp.json() 217 return body 218 219 def list_containers(self, instance_id: str) -> list[dict[str, Any]]: 220 """Enumerate every container in an instance's compose project. 221 222 Backs the admin-shell feature: the platform forwards a 223 ``GET /admin/instances/{id}/containers`` to the assigned node, 224 which returns one row per container (challenge services and 225 platform-injected sidecars alike). The WebSocket reverse-proxy 226 is opened separately and does not go through ``NodeClient``. 227 """ 228 resp = self.request("GET", f"/instances/{instance_id}/containers") 229 _raise_for_node_status(resp) 230 body: list[dict[str, Any]] = resp.json() 231 return body 232 233 def get_container_logs(self, instance_id: str) -> str: 234 """Fetch combined container stdout/stderr for archival.""" 235 resp = self.request("GET", f"/instances/{instance_id}/container-logs") 236 resp.raise_for_status() 237 return str(resp.json().get("logs") or "") 238 239 def get_pcap(self, instance_id: str) -> bytes: 240 """Fetch the raw tcpdump capture for *instance_id*. 241 242 Returns an empty ``bytes`` when no capture exists (sidecar 243 disabled, instance never reached the running state, etc.) so 244 callers can persist conditionally without try/except. 245 """ 246 resp = self.request("GET", f"/instances/{instance_id}/pcap") 247 if resp.status_code == 404: 248 return b"" 249 resp.raise_for_status() 250 return resp.content 251 252 def get_agent_runtime(self, instance_id: str) -> dict[str, Any]: 253 """Provider-agnostic runtime hints for attaching an agent (proxy URL, 254 CA PEM, optional Docker-specific names). Replaces the old 255 sandbox-network shape.""" 256 resp = self.request("GET", f"/instances/{instance_id}/agent-runtime") 257 resp.raise_for_status() 258 body: dict[str, Any] = resp.json() 259 return body 260 261 # -- per-instance rendered attachments --------------------------------- 262 263 def list_instance_attachments(self, instance_id: str) -> dict[str, Any]: 264 """List per-instance attachments rendered into the node's workdir. 265 266 Returns the raw JSON dict — caller projects through 267 :class:`AttachmentList` for typing. Used by the platform's 268 per-instance attachment endpoint to surface team-specific 269 rendered file listings.""" 270 resp = self.request("GET", f"/instances/{instance_id}/attachments") 271 resp.raise_for_status() 272 body: dict[str, Any] = resp.json() 273 return body 274 275 def get_openvpn_config(self, instance_id: str) -> bytes: 276 """Fetch the per-instance OpenVPN client config from the node. 277 278 The node reads ``/etc/openvpn/client.ovpn`` out of the instance's 279 ``openvpn`` service container (generated on first boot by the 280 ``ctfy/openvpn-base`` image's bootstrap). Returns the body 281 verbatim — the platform-side route adds the 282 ``Content-Disposition`` header before re-emitting to the player. 283 284 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 285 proxy route catches 404 and re-emits to the player, anything 286 else is treated as a 502. 287 """ 288 resp = self.request("GET", f"/instances/{instance_id}/openvpn-config") 289 resp.raise_for_status() 290 return resp.content 291 292 def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]: 293 """Download one per-instance attachment as ``(bytes, content_type)``. 294 295 Buffers the whole body — attachments are bounded (typical case 296 is a 10 KB binary or a small text file). For huge captures the 297 caller should hand the player a CDN URL instead. 298 299 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 300 proxy route catches 404 and re-emits to the player, anything 301 else is a 502.""" 302 resp = self.request( 303 "GET", 304 f"/instances/{instance_id}/attachments/{filename}", 305 ) 306 resp.raise_for_status() 307 return resp.content, resp.headers.get("content-type", "application/octet-stream")
48class NodeClient(BaseHttpClient): 49 """Thin sync HTTP client; one instance per node URL.""" 50 51 def __init__( 52 self, 53 node_url: str, 54 token: str, 55 *, 56 timeout: int = DEFAULT_CLIENT_TIMEOUT, 57 ) -> None: 58 super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout) 59 60 # -- lifecycle ---------------------------------------------------------- 61 62 def start_instance( 63 self, 64 *, 65 challenge_id: str, 66 instance_id: str, 67 ttl: int, 68 answers: dict[str, str], 69 proxy_output_dir: str | None = None, 70 ) -> dict[str, Any]: 71 resp = self.request( 72 "POST", 73 "/instances", 74 json={ 75 "challenge_id": challenge_id, 76 "instance_id": instance_id, 77 "ttl": ttl, 78 "answers": answers, 79 "proxy_output_dir": proxy_output_dir, 80 }, 81 ) 82 _raise_for_node_status(resp) 83 body: dict[str, Any] = resp.json() 84 return body 85 86 def stop_instance(self, instance_id: str) -> None: 87 resp = self.request("DELETE", f"/instances/{instance_id}") 88 resp.raise_for_status() 89 90 def stop_all(self) -> None: 91 resp = self.request("POST", "/admin/stop-all") 92 resp.raise_for_status() 93 94 def rescan_challenges(self) -> dict[str, Any]: 95 """Tell the node to drop its spec cache and re-scan challenges_dir. 96 97 Returns ``{total, added, removed}`` so the platform can report 98 the per-node outcome of a cluster-wide rescan.""" 99 resp = self.request("POST", "/admin/rescan-challenges") 100 resp.raise_for_status() 101 body: dict[str, Any] = resp.json() 102 return body 103 104 # -- admin pre-build (image cache warming) ------------------------------ 105 106 def build_challenge(self, challenge_id: str) -> dict[str, Any]: 107 """Ask the node to pre-build images for *challenge_id*. 108 109 Fires-and-returns: the node persists ``status="building"`` and 110 spawns a daemon thread; the body returned here is that initial 111 state row. Polling :meth:`get_build_state` is how the platform 112 learns when it lands on ``built`` / ``failed``. 113 """ 114 resp = self.request("POST", f"/admin/challenges/{challenge_id}/build") 115 _raise_for_node_status(resp) 116 body: dict[str, Any] = resp.json() 117 return body 118 119 def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 120 """Queue background pre-build on the node. 121 122 With ``challenge_ids=None`` the node builds every spec it knows. 123 With an explicit list (the platform scoping a bulk build to a 124 competition's challenge set) only those are built. Returns 125 ``{queued, skipped_built, skipped_in_progress}`` so the platform 126 can report per-node what got picked up. Sequential on the node 127 side — no fan-out across the corpus. 128 """ 129 kwargs: dict[str, Any] = {} 130 if challenge_ids is not None: 131 kwargs["json"] = {"challenge_ids": challenge_ids} 132 resp = self.request("POST", "/admin/challenges/build-all", **kwargs) 133 _raise_for_node_status(resp) 134 body: dict[str, Any] = resp.json() 135 return body 136 137 def get_build_state(self) -> dict[str, Any]: 138 """Fetch every per-challenge build-state row on this node. 139 140 Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``. 141 ``status`` is one of ``unbuilt`` / ``building`` / ``built`` / 142 ``failed``; ``unbuilt`` placeholders are synthesised for specs 143 the node has seen but never been asked to build. 144 """ 145 resp = self.request("GET", "/admin/challenges/build-state") 146 _raise_for_node_status(resp) 147 body: dict[str, Any] = resp.json() 148 return body 149 150 def pull_challenge(self, challenge_id: str) -> dict[str, Any]: 151 """Ask the node to pre-pull registry images for *challenge_id*. 152 153 Pull-side twin of :meth:`build_challenge`: the node persists 154 ``status="pulling"`` and spawns a daemon thread; poll 155 :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing. 156 """ 157 resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull") 158 _raise_for_node_status(resp) 159 body: dict[str, Any] = resp.json() 160 return body 161 162 def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 163 """Queue background pre-pull on the node. 164 165 With ``challenge_ids=None`` the node pulls every spec it knows; 166 with an explicit list only those (the platform scoping to a 167 competition). Returns ``{queued, skipped_pulled, 168 skipped_in_progress}``. 169 """ 170 kwargs: dict[str, Any] = {} 171 if challenge_ids is not None: 172 kwargs["json"] = {"challenge_ids": challenge_ids} 173 resp = self.request("POST", "/admin/challenges/pull-all", **kwargs) 174 _raise_for_node_status(resp) 175 body: dict[str, Any] = resp.json() 176 return body 177 178 def get_pull_state(self) -> dict[str, Any]: 179 """Fetch every per-challenge pull-state row on this node. 180 181 Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``. 182 ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` / 183 ``failed``; ``unpulled`` placeholders are synthesised for specs 184 the node has seen but never been asked to pull. 185 """ 186 resp = self.request("GET", "/admin/challenges/pull-state") 187 _raise_for_node_status(resp) 188 body: dict[str, Any] = resp.json() 189 return body 190 191 # -- status / health ---------------------------------------------------- 192 193 def get_status(self, instance_id: str) -> dict[str, Any]: 194 resp = self.request("GET", f"/instances/{instance_id}/status") 195 resp.raise_for_status() 196 body: dict[str, Any] = resp.json() 197 return body 198 199 def check_health(self, instance_id: str) -> bool: 200 resp = self.request("GET", f"/instances/{instance_id}/health") 201 resp.raise_for_status() 202 return bool(resp.json().get("is_healthy")) 203 204 def node_health(self) -> dict[str, Any]: 205 """Liveness + ``{running, capacity}`` for heartbeat.""" 206 resp = self.request("GET", "/health") 207 resp.raise_for_status() 208 body: dict[str, Any] = resp.json() 209 return body 210 211 # -- traffic / runtime -------------------------------------------------- 212 213 def get_traffic(self, instance_id: str) -> dict[str, Any]: 214 """Fetch mitmproxy flow data; file lives on the node's FS.""" 215 resp = self.request("GET", f"/instances/{instance_id}/traffic") 216 resp.raise_for_status() 217 body: dict[str, Any] = resp.json() 218 return body 219 220 def list_containers(self, instance_id: str) -> list[dict[str, Any]]: 221 """Enumerate every container in an instance's compose project. 222 223 Backs the admin-shell feature: the platform forwards a 224 ``GET /admin/instances/{id}/containers`` to the assigned node, 225 which returns one row per container (challenge services and 226 platform-injected sidecars alike). The WebSocket reverse-proxy 227 is opened separately and does not go through ``NodeClient``. 228 """ 229 resp = self.request("GET", f"/instances/{instance_id}/containers") 230 _raise_for_node_status(resp) 231 body: list[dict[str, Any]] = resp.json() 232 return body 233 234 def get_container_logs(self, instance_id: str) -> str: 235 """Fetch combined container stdout/stderr for archival.""" 236 resp = self.request("GET", f"/instances/{instance_id}/container-logs") 237 resp.raise_for_status() 238 return str(resp.json().get("logs") or "") 239 240 def get_pcap(self, instance_id: str) -> bytes: 241 """Fetch the raw tcpdump capture for *instance_id*. 242 243 Returns an empty ``bytes`` when no capture exists (sidecar 244 disabled, instance never reached the running state, etc.) so 245 callers can persist conditionally without try/except. 246 """ 247 resp = self.request("GET", f"/instances/{instance_id}/pcap") 248 if resp.status_code == 404: 249 return b"" 250 resp.raise_for_status() 251 return resp.content 252 253 def get_agent_runtime(self, instance_id: str) -> dict[str, Any]: 254 """Provider-agnostic runtime hints for attaching an agent (proxy URL, 255 CA PEM, optional Docker-specific names). Replaces the old 256 sandbox-network shape.""" 257 resp = self.request("GET", f"/instances/{instance_id}/agent-runtime") 258 resp.raise_for_status() 259 body: dict[str, Any] = resp.json() 260 return body 261 262 # -- per-instance rendered attachments --------------------------------- 263 264 def list_instance_attachments(self, instance_id: str) -> dict[str, Any]: 265 """List per-instance attachments rendered into the node's workdir. 266 267 Returns the raw JSON dict — caller projects through 268 :class:`AttachmentList` for typing. Used by the platform's 269 per-instance attachment endpoint to surface team-specific 270 rendered file listings.""" 271 resp = self.request("GET", f"/instances/{instance_id}/attachments") 272 resp.raise_for_status() 273 body: dict[str, Any] = resp.json() 274 return body 275 276 def get_openvpn_config(self, instance_id: str) -> bytes: 277 """Fetch the per-instance OpenVPN client config from the node. 278 279 The node reads ``/etc/openvpn/client.ovpn`` out of the instance's 280 ``openvpn`` service container (generated on first boot by the 281 ``ctfy/openvpn-base`` image's bootstrap). Returns the body 282 verbatim — the platform-side route adds the 283 ``Content-Disposition`` header before re-emitting to the player. 284 285 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 286 proxy route catches 404 and re-emits to the player, anything 287 else is treated as a 502. 288 """ 289 resp = self.request("GET", f"/instances/{instance_id}/openvpn-config") 290 resp.raise_for_status() 291 return resp.content 292 293 def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]: 294 """Download one per-instance attachment as ``(bytes, content_type)``. 295 296 Buffers the whole body — attachments are bounded (typical case 297 is a 10 KB binary or a small text file). For huge captures the 298 caller should hand the player a CDN URL instead. 299 300 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 301 proxy route catches 404 and re-emits to the player, anything 302 else is a 502.""" 303 resp = self.request( 304 "GET", 305 f"/instances/{instance_id}/attachments/{filename}", 306 ) 307 resp.raise_for_status() 308 return resp.content, resp.headers.get("content-type", "application/octet-stream")
Thin sync HTTP client; one instance per node URL.
62 def start_instance( 63 self, 64 *, 65 challenge_id: str, 66 instance_id: str, 67 ttl: int, 68 answers: dict[str, str], 69 proxy_output_dir: str | None = None, 70 ) -> dict[str, Any]: 71 resp = self.request( 72 "POST", 73 "/instances", 74 json={ 75 "challenge_id": challenge_id, 76 "instance_id": instance_id, 77 "ttl": ttl, 78 "answers": answers, 79 "proxy_output_dir": proxy_output_dir, 80 }, 81 ) 82 _raise_for_node_status(resp) 83 body: dict[str, Any] = resp.json() 84 return body
94 def rescan_challenges(self) -> dict[str, Any]: 95 """Tell the node to drop its spec cache and re-scan challenges_dir. 96 97 Returns ``{total, added, removed}`` so the platform can report 98 the per-node outcome of a cluster-wide rescan.""" 99 resp = self.request("POST", "/admin/rescan-challenges") 100 resp.raise_for_status() 101 body: dict[str, Any] = resp.json() 102 return body
Tell the node to drop its spec cache and re-scan challenges_dir.
Returns {total, added, removed} so the platform can report
the per-node outcome of a cluster-wide rescan.
106 def build_challenge(self, challenge_id: str) -> dict[str, Any]: 107 """Ask the node to pre-build images for *challenge_id*. 108 109 Fires-and-returns: the node persists ``status="building"`` and 110 spawns a daemon thread; the body returned here is that initial 111 state row. Polling :meth:`get_build_state` is how the platform 112 learns when it lands on ``built`` / ``failed``. 113 """ 114 resp = self.request("POST", f"/admin/challenges/{challenge_id}/build") 115 _raise_for_node_status(resp) 116 body: dict[str, Any] = resp.json() 117 return body
Ask the node to pre-build images for challenge_id.
Fires-and-returns: the node persists status="building" and
spawns a daemon thread; the body returned here is that initial
state row. Polling get_build_state() is how the platform
learns when it lands on built / failed.
119 def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 120 """Queue background pre-build on the node. 121 122 With ``challenge_ids=None`` the node builds every spec it knows. 123 With an explicit list (the platform scoping a bulk build to a 124 competition's challenge set) only those are built. Returns 125 ``{queued, skipped_built, skipped_in_progress}`` so the platform 126 can report per-node what got picked up. Sequential on the node 127 side — no fan-out across the corpus. 128 """ 129 kwargs: dict[str, Any] = {} 130 if challenge_ids is not None: 131 kwargs["json"] = {"challenge_ids": challenge_ids} 132 resp = self.request("POST", "/admin/challenges/build-all", **kwargs) 133 _raise_for_node_status(resp) 134 body: dict[str, Any] = resp.json() 135 return body
Queue background pre-build on the node.
With challenge_ids=None the node builds every spec it knows.
With an explicit list (the platform scoping a bulk build to a
competition's challenge set) only those are built. Returns
{queued, skipped_built, skipped_in_progress} so the platform
can report per-node what got picked up. Sequential on the node
side — no fan-out across the corpus.
137 def get_build_state(self) -> dict[str, Any]: 138 """Fetch every per-challenge build-state row on this node. 139 140 Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``. 141 ``status`` is one of ``unbuilt`` / ``building`` / ``built`` / 142 ``failed``; ``unbuilt`` placeholders are synthesised for specs 143 the node has seen but never been asked to build. 144 """ 145 resp = self.request("GET", "/admin/challenges/build-state") 146 _raise_for_node_status(resp) 147 body: dict[str, Any] = resp.json() 148 return body
Fetch every per-challenge build-state row on this node.
Returns {"rows": [{challenge_id, status, built_at, error}, …]}.
status is one of unbuilt / building / built /
failed; unbuilt placeholders are synthesised for specs
the node has seen but never been asked to build.
150 def pull_challenge(self, challenge_id: str) -> dict[str, Any]: 151 """Ask the node to pre-pull registry images for *challenge_id*. 152 153 Pull-side twin of :meth:`build_challenge`: the node persists 154 ``status="pulling"`` and spawns a daemon thread; poll 155 :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing. 156 """ 157 resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull") 158 _raise_for_node_status(resp) 159 body: dict[str, Any] = resp.json() 160 return body
Ask the node to pre-pull registry images for challenge_id.
Pull-side twin of build_challenge(): the node persists
status="pulling" and spawns a daemon thread; poll
get_pull_state() for the pulled / failed landing.
162 def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 163 """Queue background pre-pull on the node. 164 165 With ``challenge_ids=None`` the node pulls every spec it knows; 166 with an explicit list only those (the platform scoping to a 167 competition). Returns ``{queued, skipped_pulled, 168 skipped_in_progress}``. 169 """ 170 kwargs: dict[str, Any] = {} 171 if challenge_ids is not None: 172 kwargs["json"] = {"challenge_ids": challenge_ids} 173 resp = self.request("POST", "/admin/challenges/pull-all", **kwargs) 174 _raise_for_node_status(resp) 175 body: dict[str, Any] = resp.json() 176 return body
Queue background pre-pull on the node.
With challenge_ids=None the node pulls every spec it knows;
with an explicit list only those (the platform scoping to a
competition). Returns {queued, skipped_pulled,
skipped_in_progress}.
178 def get_pull_state(self) -> dict[str, Any]: 179 """Fetch every per-challenge pull-state row on this node. 180 181 Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``. 182 ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` / 183 ``failed``; ``unpulled`` placeholders are synthesised for specs 184 the node has seen but never been asked to pull. 185 """ 186 resp = self.request("GET", "/admin/challenges/pull-state") 187 _raise_for_node_status(resp) 188 body: dict[str, Any] = resp.json() 189 return body
Fetch every per-challenge pull-state row on this node.
Returns {"rows": [{challenge_id, status, pulled_at, error}, …]}.
status is one of unpulled / pulling / pulled /
failed; unpulled placeholders are synthesised for specs
the node has seen but never been asked to pull.
204 def node_health(self) -> dict[str, Any]: 205 """Liveness + ``{running, capacity}`` for heartbeat.""" 206 resp = self.request("GET", "/health") 207 resp.raise_for_status() 208 body: dict[str, Any] = resp.json() 209 return body
Liveness + {running, capacity} for heartbeat.
213 def get_traffic(self, instance_id: str) -> dict[str, Any]: 214 """Fetch mitmproxy flow data; file lives on the node's FS.""" 215 resp = self.request("GET", f"/instances/{instance_id}/traffic") 216 resp.raise_for_status() 217 body: dict[str, Any] = resp.json() 218 return body
Fetch mitmproxy flow data; file lives on the node's FS.
220 def list_containers(self, instance_id: str) -> list[dict[str, Any]]: 221 """Enumerate every container in an instance's compose project. 222 223 Backs the admin-shell feature: the platform forwards a 224 ``GET /admin/instances/{id}/containers`` to the assigned node, 225 which returns one row per container (challenge services and 226 platform-injected sidecars alike). The WebSocket reverse-proxy 227 is opened separately and does not go through ``NodeClient``. 228 """ 229 resp = self.request("GET", f"/instances/{instance_id}/containers") 230 _raise_for_node_status(resp) 231 body: list[dict[str, Any]] = resp.json() 232 return body
Enumerate every container in an instance's compose project.
Backs the admin-shell feature: the platform forwards a
GET /admin/instances/{id}/containers to the assigned node,
which returns one row per container (challenge services and
platform-injected sidecars alike). The WebSocket reverse-proxy
is opened separately and does not go through NodeClient.
234 def get_container_logs(self, instance_id: str) -> str: 235 """Fetch combined container stdout/stderr for archival.""" 236 resp = self.request("GET", f"/instances/{instance_id}/container-logs") 237 resp.raise_for_status() 238 return str(resp.json().get("logs") or "")
Fetch combined container stdout/stderr for archival.
240 def get_pcap(self, instance_id: str) -> bytes: 241 """Fetch the raw tcpdump capture for *instance_id*. 242 243 Returns an empty ``bytes`` when no capture exists (sidecar 244 disabled, instance never reached the running state, etc.) so 245 callers can persist conditionally without try/except. 246 """ 247 resp = self.request("GET", f"/instances/{instance_id}/pcap") 248 if resp.status_code == 404: 249 return b"" 250 resp.raise_for_status() 251 return resp.content
Fetch the raw tcpdump capture for instance_id.
Returns an empty bytes when no capture exists (sidecar
disabled, instance never reached the running state, etc.) so
callers can persist conditionally without try/except.
253 def get_agent_runtime(self, instance_id: str) -> dict[str, Any]: 254 """Provider-agnostic runtime hints for attaching an agent (proxy URL, 255 CA PEM, optional Docker-specific names). Replaces the old 256 sandbox-network shape.""" 257 resp = self.request("GET", f"/instances/{instance_id}/agent-runtime") 258 resp.raise_for_status() 259 body: dict[str, Any] = resp.json() 260 return body
Provider-agnostic runtime hints for attaching an agent (proxy URL, CA PEM, optional Docker-specific names). Replaces the old sandbox-network shape.
264 def list_instance_attachments(self, instance_id: str) -> dict[str, Any]: 265 """List per-instance attachments rendered into the node's workdir. 266 267 Returns the raw JSON dict — caller projects through 268 :class:`AttachmentList` for typing. Used by the platform's 269 per-instance attachment endpoint to surface team-specific 270 rendered file listings.""" 271 resp = self.request("GET", f"/instances/{instance_id}/attachments") 272 resp.raise_for_status() 273 body: dict[str, Any] = resp.json() 274 return body
List per-instance attachments rendered into the node's workdir.
Returns the raw JSON dict — caller projects through
AttachmentList for typing. Used by the platform's
per-instance attachment endpoint to surface team-specific
rendered file listings.
276 def get_openvpn_config(self, instance_id: str) -> bytes: 277 """Fetch the per-instance OpenVPN client config from the node. 278 279 The node reads ``/etc/openvpn/client.ovpn`` out of the instance's 280 ``openvpn`` service container (generated on first boot by the 281 ``ctfy/openvpn-base`` image's bootstrap). Returns the body 282 verbatim — the platform-side route adds the 283 ``Content-Disposition`` header before re-emitting to the player. 284 285 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 286 proxy route catches 404 and re-emits to the player, anything 287 else is treated as a 502. 288 """ 289 resp = self.request("GET", f"/instances/{instance_id}/openvpn-config") 290 resp.raise_for_status() 291 return resp.content
Fetch the per-instance OpenVPN client config from the node.
The node reads /etc/openvpn/client.ovpn out of the instance's
openvpn service container (generated on first boot by the
ctfy/openvpn-base image's bootstrap). Returns the body
verbatim — the platform-side route adds the
Content-Disposition header before re-emitting to the player.
Raises httpx.HTTPStatusError on non-2xx — the platform's
proxy route catches 404 and re-emits to the player, anything
else is treated as a 502.
293 def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]: 294 """Download one per-instance attachment as ``(bytes, content_type)``. 295 296 Buffers the whole body — attachments are bounded (typical case 297 is a 10 KB binary or a small text file). For huge captures the 298 caller should hand the player a CDN URL instead. 299 300 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 301 proxy route catches 404 and re-emits to the player, anything 302 else is a 502.""" 303 resp = self.request( 304 "GET", 305 f"/instances/{instance_id}/attachments/{filename}", 306 ) 307 resp.raise_for_status() 308 return resp.content, resp.headers.get("content-type", "application/octet-stream")
Download one per-instance attachment as (bytes, content_type).
Buffers the whole body — attachments are bounded (typical case is a 10 KB binary or a small text file). For huge captures the caller should hand the player a CDN URL instead.
Raises httpx.HTTPStatusError on non-2xx — the platform's
proxy route catches 404 and re-emits to the player, anything
else is a 502.