ctfy.sdk.node_client

HTTP client for platform → node machine calls.

Every call is authenticated with the target node's bearer token — minted per node at registration and used in both directions. End users never touch this client; they talk to the platform, which proxies to the node. Node endpoints are defined in server/routes/node_instances.py.

  1"""HTTP client for platform → node machine calls.
  2
  3Every call is authenticated with the target node's bearer token —
  4minted per node at registration and used in both directions. End users
  5never touch this client; they talk to the platform, which proxies to
  6the node. Node endpoints are defined in
  7``server/routes/node_instances.py``.
  8"""
  9
 10from __future__ import annotations
 11
 12import json
 13from typing import Any
 14
 15import httpx
 16
 17from ctfy.core.constants import DEFAULT_CLIENT_TIMEOUT
 18from ctfy.core.exceptions import NodeRequestError
 19from ctfy.sdk.base import BaseHttpClient
 20
 21
 22def _raise_for_node_status(resp: httpx.Response) -> None:
 23    """Like ``resp.raise_for_status()`` but preserves the node's error body.
 24
 25    On a 4xx/5xx, pull the FastAPI ``{"detail": ...}`` field (or the raw
 26    body as a fallback) and raise :class:`NodeRequestError` so the real
 27    failure reaches the caller instead of httpx's generic status string.
 28    """
 29    if not resp.is_error:
 30        return
 31    detail = ""
 32    try:
 33        body = resp.json()
 34    except (json.JSONDecodeError, ValueError):
 35        body = None
 36    if isinstance(body, dict):
 37        d = body.get("detail")
 38        if isinstance(d, str):
 39            detail = d
 40        elif d is not None:
 41            detail = json.dumps(d)
 42    if not detail:
 43        detail = (resp.text or "").strip()
 44    raise NodeRequestError(resp.status_code, detail)
 45
 46
 47class NodeClient(BaseHttpClient):
 48    """Thin sync HTTP client; one instance per node URL."""
 49
 50    def __init__(
 51        self,
 52        node_url: str,
 53        token: str,
 54        *,
 55        timeout: int = DEFAULT_CLIENT_TIMEOUT,
 56    ) -> None:
 57        super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout)
 58
 59    # -- lifecycle ----------------------------------------------------------
 60
 61    def start_instance(
 62        self,
 63        *,
 64        challenge_id: str,
 65        instance_id: str,
 66        ttl: int,
 67        answers: dict[str, str],
 68        proxy_output_dir: str | None = None,
 69    ) -> dict[str, Any]:
 70        resp = self.request(
 71            "POST",
 72            "/instances",
 73            json={
 74                "challenge_id": challenge_id,
 75                "instance_id": instance_id,
 76                "ttl": ttl,
 77                "answers": answers,
 78                "proxy_output_dir": proxy_output_dir,
 79            },
 80        )
 81        _raise_for_node_status(resp)
 82        body: dict[str, Any] = resp.json()
 83        return body
 84
 85    def stop_instance(self, instance_id: str) -> None:
 86        resp = self.request("DELETE", f"/instances/{instance_id}")
 87        resp.raise_for_status()
 88
 89    def stop_all(self) -> None:
 90        resp = self.request("POST", "/admin/stop-all")
 91        resp.raise_for_status()
 92
 93    def rescan_challenges(self) -> dict[str, Any]:
 94        """Tell the node to drop its spec cache and re-scan challenges_dir.
 95
 96        Returns ``{total, added, removed}`` so the platform can report
 97        the per-node outcome of a cluster-wide rescan."""
 98        resp = self.request("POST", "/admin/rescan-challenges")
 99        resp.raise_for_status()
100        body: dict[str, Any] = resp.json()
101        return body
102
103    # -- admin pre-build (image cache warming) ------------------------------
104
105    def build_challenge(self, challenge_id: str) -> dict[str, Any]:
106        """Ask the node to pre-build images for *challenge_id*.
107
108        Fires-and-returns: the node persists ``status="building"`` and
109        spawns a daemon thread; the body returned here is that initial
110        state row. Polling :meth:`get_build_state` is how the platform
111        learns when it lands on ``built`` / ``failed``.
112        """
113        resp = self.request("POST", f"/admin/challenges/{challenge_id}/build")
114        _raise_for_node_status(resp)
115        body: dict[str, Any] = resp.json()
116        return body
117
118    def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
119        """Queue background pre-build on the node.
120
121        With ``challenge_ids=None`` the node builds every spec it knows.
122        With an explicit list (the platform scoping a bulk build to a
123        competition's challenge set) only those are built. Returns
124        ``{queued, skipped_built, skipped_in_progress}`` so the platform
125        can report per-node what got picked up. Sequential on the node
126        side — no fan-out across the corpus.
127        """
128        kwargs: dict[str, Any] = {}
129        if challenge_ids is not None:
130            kwargs["json"] = {"challenge_ids": challenge_ids}
131        resp = self.request("POST", "/admin/challenges/build-all", **kwargs)
132        _raise_for_node_status(resp)
133        body: dict[str, Any] = resp.json()
134        return body
135
136    def get_build_state(self) -> dict[str, Any]:
137        """Fetch every per-challenge build-state row on this node.
138
139        Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``.
140        ``status`` is one of ``unbuilt`` / ``building`` / ``built`` /
141        ``failed``; ``unbuilt`` placeholders are synthesised for specs
142        the node has seen but never been asked to build.
143        """
144        resp = self.request("GET", "/admin/challenges/build-state")
145        _raise_for_node_status(resp)
146        body: dict[str, Any] = resp.json()
147        return body
148
149    def pull_challenge(self, challenge_id: str) -> dict[str, Any]:
150        """Ask the node to pre-pull registry images for *challenge_id*.
151
152        Pull-side twin of :meth:`build_challenge`: the node persists
153        ``status="pulling"`` and spawns a daemon thread; poll
154        :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing.
155        """
156        resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull")
157        _raise_for_node_status(resp)
158        body: dict[str, Any] = resp.json()
159        return body
160
161    def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
162        """Queue background pre-pull on the node.
163
164        With ``challenge_ids=None`` the node pulls every spec it knows;
165        with an explicit list only those (the platform scoping to a
166        competition). Returns ``{queued, skipped_pulled,
167        skipped_in_progress}``.
168        """
169        kwargs: dict[str, Any] = {}
170        if challenge_ids is not None:
171            kwargs["json"] = {"challenge_ids": challenge_ids}
172        resp = self.request("POST", "/admin/challenges/pull-all", **kwargs)
173        _raise_for_node_status(resp)
174        body: dict[str, Any] = resp.json()
175        return body
176
177    def get_pull_state(self) -> dict[str, Any]:
178        """Fetch every per-challenge pull-state row on this node.
179
180        Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``.
181        ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` /
182        ``failed``; ``unpulled`` placeholders are synthesised for specs
183        the node has seen but never been asked to pull.
184        """
185        resp = self.request("GET", "/admin/challenges/pull-state")
186        _raise_for_node_status(resp)
187        body: dict[str, Any] = resp.json()
188        return body
189
190    # -- status / health ----------------------------------------------------
191
192    def get_status(self, instance_id: str) -> dict[str, Any]:
193        resp = self.request("GET", f"/instances/{instance_id}/status")
194        resp.raise_for_status()
195        body: dict[str, Any] = resp.json()
196        return body
197
198    def check_health(self, instance_id: str) -> bool:
199        resp = self.request("GET", f"/instances/{instance_id}/health")
200        resp.raise_for_status()
201        return bool(resp.json().get("is_healthy"))
202
203    def node_health(self) -> dict[str, Any]:
204        """Liveness + ``{running, capacity}`` for heartbeat."""
205        resp = self.request("GET", "/health")
206        resp.raise_for_status()
207        body: dict[str, Any] = resp.json()
208        return body
209
210    # -- traffic / runtime --------------------------------------------------
211
212    def get_traffic(self, instance_id: str) -> dict[str, Any]:
213        """Fetch mitmproxy flow data; file lives on the node's FS."""
214        resp = self.request("GET", f"/instances/{instance_id}/traffic")
215        resp.raise_for_status()
216        body: dict[str, Any] = resp.json()
217        return body
218
219    def list_containers(self, instance_id: str) -> list[dict[str, Any]]:
220        """Enumerate every container in an instance's compose project.
221
222        Backs the admin-shell feature: the platform forwards a
223        ``GET /admin/instances/{id}/containers`` to the assigned node,
224        which returns one row per container (challenge services and
225        platform-injected sidecars alike). The WebSocket reverse-proxy
226        is opened separately and does not go through ``NodeClient``.
227        """
228        resp = self.request("GET", f"/instances/{instance_id}/containers")
229        _raise_for_node_status(resp)
230        body: list[dict[str, Any]] = resp.json()
231        return body
232
233    def get_container_logs(self, instance_id: str) -> str:
234        """Fetch combined container stdout/stderr for archival."""
235        resp = self.request("GET", f"/instances/{instance_id}/container-logs")
236        resp.raise_for_status()
237        return str(resp.json().get("logs") or "")
238
239    def get_pcap(self, instance_id: str) -> bytes:
240        """Fetch the raw tcpdump capture for *instance_id*.
241
242        Returns an empty ``bytes`` when no capture exists (sidecar
243        disabled, instance never reached the running state, etc.) so
244        callers can persist conditionally without try/except.
245        """
246        resp = self.request("GET", f"/instances/{instance_id}/pcap")
247        if resp.status_code == 404:
248            return b""
249        resp.raise_for_status()
250        return resp.content
251
252    def get_agent_runtime(self, instance_id: str) -> dict[str, Any]:
253        """Provider-agnostic runtime hints for attaching an agent (proxy URL,
254        CA PEM, optional Docker-specific names). Replaces the old
255        sandbox-network shape."""
256        resp = self.request("GET", f"/instances/{instance_id}/agent-runtime")
257        resp.raise_for_status()
258        body: dict[str, Any] = resp.json()
259        return body
260
261    # -- per-instance rendered attachments ---------------------------------
262
263    def list_instance_attachments(self, instance_id: str) -> dict[str, Any]:
264        """List per-instance attachments rendered into the node's workdir.
265
266        Returns the raw JSON dict — caller projects through
267        :class:`AttachmentList` for typing. Used by the platform's
268        per-instance attachment endpoint to surface team-specific
269        rendered file listings."""
270        resp = self.request("GET", f"/instances/{instance_id}/attachments")
271        resp.raise_for_status()
272        body: dict[str, Any] = resp.json()
273        return body
274
275    def get_openvpn_config(self, instance_id: str) -> bytes:
276        """Fetch the per-instance OpenVPN client config from the node.
277
278        The node reads ``/etc/openvpn/client.ovpn`` out of the instance's
279        ``openvpn`` service container (generated on first boot by the
280        ``ctfy/openvpn-base`` image's bootstrap). Returns the body
281        verbatim — the platform-side route adds the
282        ``Content-Disposition`` header before re-emitting to the player.
283
284        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
285        proxy route catches 404 and re-emits to the player, anything
286        else is treated as a 502.
287        """
288        resp = self.request("GET", f"/instances/{instance_id}/openvpn-config")
289        resp.raise_for_status()
290        return resp.content
291
292    def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
293        """Download one per-instance attachment as ``(bytes, content_type)``.
294
295        Buffers the whole body — attachments are bounded (typical case
296        is a 10 KB binary or a small text file). For huge captures the
297        caller should hand the player a CDN URL instead.
298
299        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
300        proxy route catches 404 and re-emits to the player, anything
301        else is a 502."""
302        resp = self.request(
303            "GET",
304            f"/instances/{instance_id}/attachments/{filename}",
305        )
306        resp.raise_for_status()
307        return resp.content, resp.headers.get("content-type", "application/octet-stream")
class NodeClient(ctfy.sdk.base.BaseHttpClient):
 48class NodeClient(BaseHttpClient):
 49    """Thin sync HTTP client; one instance per node URL."""
 50
 51    def __init__(
 52        self,
 53        node_url: str,
 54        token: str,
 55        *,
 56        timeout: int = DEFAULT_CLIENT_TIMEOUT,
 57    ) -> None:
 58        super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout)
 59
 60    # -- lifecycle ----------------------------------------------------------
 61
 62    def start_instance(
 63        self,
 64        *,
 65        challenge_id: str,
 66        instance_id: str,
 67        ttl: int,
 68        answers: dict[str, str],
 69        proxy_output_dir: str | None = None,
 70    ) -> dict[str, Any]:
 71        resp = self.request(
 72            "POST",
 73            "/instances",
 74            json={
 75                "challenge_id": challenge_id,
 76                "instance_id": instance_id,
 77                "ttl": ttl,
 78                "answers": answers,
 79                "proxy_output_dir": proxy_output_dir,
 80            },
 81        )
 82        _raise_for_node_status(resp)
 83        body: dict[str, Any] = resp.json()
 84        return body
 85
 86    def stop_instance(self, instance_id: str) -> None:
 87        resp = self.request("DELETE", f"/instances/{instance_id}")
 88        resp.raise_for_status()
 89
 90    def stop_all(self) -> None:
 91        resp = self.request("POST", "/admin/stop-all")
 92        resp.raise_for_status()
 93
 94    def rescan_challenges(self) -> dict[str, Any]:
 95        """Tell the node to drop its spec cache and re-scan challenges_dir.
 96
 97        Returns ``{total, added, removed}`` so the platform can report
 98        the per-node outcome of a cluster-wide rescan."""
 99        resp = self.request("POST", "/admin/rescan-challenges")
100        resp.raise_for_status()
101        body: dict[str, Any] = resp.json()
102        return body
103
104    # -- admin pre-build (image cache warming) ------------------------------
105
106    def build_challenge(self, challenge_id: str) -> dict[str, Any]:
107        """Ask the node to pre-build images for *challenge_id*.
108
109        Fires-and-returns: the node persists ``status="building"`` and
110        spawns a daemon thread; the body returned here is that initial
111        state row. Polling :meth:`get_build_state` is how the platform
112        learns when it lands on ``built`` / ``failed``.
113        """
114        resp = self.request("POST", f"/admin/challenges/{challenge_id}/build")
115        _raise_for_node_status(resp)
116        body: dict[str, Any] = resp.json()
117        return body
118
119    def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
120        """Queue background pre-build on the node.
121
122        With ``challenge_ids=None`` the node builds every spec it knows.
123        With an explicit list (the platform scoping a bulk build to a
124        competition's challenge set) only those are built. Returns
125        ``{queued, skipped_built, skipped_in_progress}`` so the platform
126        can report per-node what got picked up. Sequential on the node
127        side — no fan-out across the corpus.
128        """
129        kwargs: dict[str, Any] = {}
130        if challenge_ids is not None:
131            kwargs["json"] = {"challenge_ids": challenge_ids}
132        resp = self.request("POST", "/admin/challenges/build-all", **kwargs)
133        _raise_for_node_status(resp)
134        body: dict[str, Any] = resp.json()
135        return body
136
137    def get_build_state(self) -> dict[str, Any]:
138        """Fetch every per-challenge build-state row on this node.
139
140        Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``.
141        ``status`` is one of ``unbuilt`` / ``building`` / ``built`` /
142        ``failed``; ``unbuilt`` placeholders are synthesised for specs
143        the node has seen but never been asked to build.
144        """
145        resp = self.request("GET", "/admin/challenges/build-state")
146        _raise_for_node_status(resp)
147        body: dict[str, Any] = resp.json()
148        return body
149
150    def pull_challenge(self, challenge_id: str) -> dict[str, Any]:
151        """Ask the node to pre-pull registry images for *challenge_id*.
152
153        Pull-side twin of :meth:`build_challenge`: the node persists
154        ``status="pulling"`` and spawns a daemon thread; poll
155        :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing.
156        """
157        resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull")
158        _raise_for_node_status(resp)
159        body: dict[str, Any] = resp.json()
160        return body
161
162    def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
163        """Queue background pre-pull on the node.
164
165        With ``challenge_ids=None`` the node pulls every spec it knows;
166        with an explicit list only those (the platform scoping to a
167        competition). Returns ``{queued, skipped_pulled,
168        skipped_in_progress}``.
169        """
170        kwargs: dict[str, Any] = {}
171        if challenge_ids is not None:
172            kwargs["json"] = {"challenge_ids": challenge_ids}
173        resp = self.request("POST", "/admin/challenges/pull-all", **kwargs)
174        _raise_for_node_status(resp)
175        body: dict[str, Any] = resp.json()
176        return body
177
178    def get_pull_state(self) -> dict[str, Any]:
179        """Fetch every per-challenge pull-state row on this node.
180
181        Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``.
182        ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` /
183        ``failed``; ``unpulled`` placeholders are synthesised for specs
184        the node has seen but never been asked to pull.
185        """
186        resp = self.request("GET", "/admin/challenges/pull-state")
187        _raise_for_node_status(resp)
188        body: dict[str, Any] = resp.json()
189        return body
190
191    # -- status / health ----------------------------------------------------
192
193    def get_status(self, instance_id: str) -> dict[str, Any]:
194        resp = self.request("GET", f"/instances/{instance_id}/status")
195        resp.raise_for_status()
196        body: dict[str, Any] = resp.json()
197        return body
198
199    def check_health(self, instance_id: str) -> bool:
200        resp = self.request("GET", f"/instances/{instance_id}/health")
201        resp.raise_for_status()
202        return bool(resp.json().get("is_healthy"))
203
204    def node_health(self) -> dict[str, Any]:
205        """Liveness + ``{running, capacity}`` for heartbeat."""
206        resp = self.request("GET", "/health")
207        resp.raise_for_status()
208        body: dict[str, Any] = resp.json()
209        return body
210
211    # -- traffic / runtime --------------------------------------------------
212
213    def get_traffic(self, instance_id: str) -> dict[str, Any]:
214        """Fetch mitmproxy flow data; file lives on the node's FS."""
215        resp = self.request("GET", f"/instances/{instance_id}/traffic")
216        resp.raise_for_status()
217        body: dict[str, Any] = resp.json()
218        return body
219
220    def list_containers(self, instance_id: str) -> list[dict[str, Any]]:
221        """Enumerate every container in an instance's compose project.
222
223        Backs the admin-shell feature: the platform forwards a
224        ``GET /admin/instances/{id}/containers`` to the assigned node,
225        which returns one row per container (challenge services and
226        platform-injected sidecars alike). The WebSocket reverse-proxy
227        is opened separately and does not go through ``NodeClient``.
228        """
229        resp = self.request("GET", f"/instances/{instance_id}/containers")
230        _raise_for_node_status(resp)
231        body: list[dict[str, Any]] = resp.json()
232        return body
233
234    def get_container_logs(self, instance_id: str) -> str:
235        """Fetch combined container stdout/stderr for archival."""
236        resp = self.request("GET", f"/instances/{instance_id}/container-logs")
237        resp.raise_for_status()
238        return str(resp.json().get("logs") or "")
239
240    def get_pcap(self, instance_id: str) -> bytes:
241        """Fetch the raw tcpdump capture for *instance_id*.
242
243        Returns an empty ``bytes`` when no capture exists (sidecar
244        disabled, instance never reached the running state, etc.) so
245        callers can persist conditionally without try/except.
246        """
247        resp = self.request("GET", f"/instances/{instance_id}/pcap")
248        if resp.status_code == 404:
249            return b""
250        resp.raise_for_status()
251        return resp.content
252
253    def get_agent_runtime(self, instance_id: str) -> dict[str, Any]:
254        """Provider-agnostic runtime hints for attaching an agent (proxy URL,
255        CA PEM, optional Docker-specific names). Replaces the old
256        sandbox-network shape."""
257        resp = self.request("GET", f"/instances/{instance_id}/agent-runtime")
258        resp.raise_for_status()
259        body: dict[str, Any] = resp.json()
260        return body
261
262    # -- per-instance rendered attachments ---------------------------------
263
264    def list_instance_attachments(self, instance_id: str) -> dict[str, Any]:
265        """List per-instance attachments rendered into the node's workdir.
266
267        Returns the raw JSON dict — caller projects through
268        :class:`AttachmentList` for typing. Used by the platform's
269        per-instance attachment endpoint to surface team-specific
270        rendered file listings."""
271        resp = self.request("GET", f"/instances/{instance_id}/attachments")
272        resp.raise_for_status()
273        body: dict[str, Any] = resp.json()
274        return body
275
276    def get_openvpn_config(self, instance_id: str) -> bytes:
277        """Fetch the per-instance OpenVPN client config from the node.
278
279        The node reads ``/etc/openvpn/client.ovpn`` out of the instance's
280        ``openvpn`` service container (generated on first boot by the
281        ``ctfy/openvpn-base`` image's bootstrap). Returns the body
282        verbatim — the platform-side route adds the
283        ``Content-Disposition`` header before re-emitting to the player.
284
285        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
286        proxy route catches 404 and re-emits to the player, anything
287        else is treated as a 502.
288        """
289        resp = self.request("GET", f"/instances/{instance_id}/openvpn-config")
290        resp.raise_for_status()
291        return resp.content
292
293    def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
294        """Download one per-instance attachment as ``(bytes, content_type)``.
295
296        Buffers the whole body — attachments are bounded (typical case
297        is a 10 KB binary or a small text file). For huge captures the
298        caller should hand the player a CDN URL instead.
299
300        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
301        proxy route catches 404 and re-emits to the player, anything
302        else is a 502."""
303        resp = self.request(
304            "GET",
305            f"/instances/{instance_id}/attachments/{filename}",
306        )
307        resp.raise_for_status()
308        return resp.content, resp.headers.get("content-type", "application/octet-stream")

Thin sync HTTP client; one instance per node URL.

NodeClient(node_url: str, token: str, *, timeout: int = 600)
51    def __init__(
52        self,
53        node_url: str,
54        token: str,
55        *,
56        timeout: int = DEFAULT_CLIENT_TIMEOUT,
57    ) -> None:
58        super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout)
def start_instance( self, *, challenge_id: str, instance_id: str, ttl: int, answers: dict[str, str], proxy_output_dir: str | None = None) -> dict[str, typing.Any]:
62    def start_instance(
63        self,
64        *,
65        challenge_id: str,
66        instance_id: str,
67        ttl: int,
68        answers: dict[str, str],
69        proxy_output_dir: str | None = None,
70    ) -> dict[str, Any]:
71        resp = self.request(
72            "POST",
73            "/instances",
74            json={
75                "challenge_id": challenge_id,
76                "instance_id": instance_id,
77                "ttl": ttl,
78                "answers": answers,
79                "proxy_output_dir": proxy_output_dir,
80            },
81        )
82        _raise_for_node_status(resp)
83        body: dict[str, Any] = resp.json()
84        return body
def stop_instance(self, instance_id: str) -> None:
86    def stop_instance(self, instance_id: str) -> None:
87        resp = self.request("DELETE", f"/instances/{instance_id}")
88        resp.raise_for_status()
def stop_all(self) -> None:
90    def stop_all(self) -> None:
91        resp = self.request("POST", "/admin/stop-all")
92        resp.raise_for_status()
def rescan_challenges(self) -> dict[str, typing.Any]:
 94    def rescan_challenges(self) -> dict[str, Any]:
 95        """Tell the node to drop its spec cache and re-scan challenges_dir.
 96
 97        Returns ``{total, added, removed}`` so the platform can report
 98        the per-node outcome of a cluster-wide rescan."""
 99        resp = self.request("POST", "/admin/rescan-challenges")
100        resp.raise_for_status()
101        body: dict[str, Any] = resp.json()
102        return body

Tell the node to drop its spec cache and re-scan challenges_dir.

Returns {total, added, removed} so the platform can report the per-node outcome of a cluster-wide rescan.

def build_challenge(self, challenge_id: str) -> dict[str, typing.Any]:
106    def build_challenge(self, challenge_id: str) -> dict[str, Any]:
107        """Ask the node to pre-build images for *challenge_id*.
108
109        Fires-and-returns: the node persists ``status="building"`` and
110        spawns a daemon thread; the body returned here is that initial
111        state row. Polling :meth:`get_build_state` is how the platform
112        learns when it lands on ``built`` / ``failed``.
113        """
114        resp = self.request("POST", f"/admin/challenges/{challenge_id}/build")
115        _raise_for_node_status(resp)
116        body: dict[str, Any] = resp.json()
117        return body

Ask the node to pre-build images for challenge_id.

Fires-and-returns: the node persists status="building" and spawns a daemon thread; the body returned here is that initial state row. Polling get_build_state() is how the platform learns when it lands on built / failed.

def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, typing.Any]:
119    def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
120        """Queue background pre-build on the node.
121
122        With ``challenge_ids=None`` the node builds every spec it knows.
123        With an explicit list (the platform scoping a bulk build to a
124        competition's challenge set) only those are built. Returns
125        ``{queued, skipped_built, skipped_in_progress}`` so the platform
126        can report per-node what got picked up. Sequential on the node
127        side — no fan-out across the corpus.
128        """
129        kwargs: dict[str, Any] = {}
130        if challenge_ids is not None:
131            kwargs["json"] = {"challenge_ids": challenge_ids}
132        resp = self.request("POST", "/admin/challenges/build-all", **kwargs)
133        _raise_for_node_status(resp)
134        body: dict[str, Any] = resp.json()
135        return body

Queue background pre-build on the node.

With challenge_ids=None the node builds every spec it knows. With an explicit list (the platform scoping a bulk build to a competition's challenge set) only those are built. Returns {queued, skipped_built, skipped_in_progress} so the platform can report per-node what got picked up. Sequential on the node side — no fan-out across the corpus.

def get_build_state(self) -> dict[str, typing.Any]:
137    def get_build_state(self) -> dict[str, Any]:
138        """Fetch every per-challenge build-state row on this node.
139
140        Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``.
141        ``status`` is one of ``unbuilt`` / ``building`` / ``built`` /
142        ``failed``; ``unbuilt`` placeholders are synthesised for specs
143        the node has seen but never been asked to build.
144        """
145        resp = self.request("GET", "/admin/challenges/build-state")
146        _raise_for_node_status(resp)
147        body: dict[str, Any] = resp.json()
148        return body

Fetch every per-challenge build-state row on this node.

Returns {"rows": [{challenge_id, status, built_at, error}, …]}. status is one of unbuilt / building / built / failed; unbuilt placeholders are synthesised for specs the node has seen but never been asked to build.

def pull_challenge(self, challenge_id: str) -> dict[str, typing.Any]:
150    def pull_challenge(self, challenge_id: str) -> dict[str, Any]:
151        """Ask the node to pre-pull registry images for *challenge_id*.
152
153        Pull-side twin of :meth:`build_challenge`: the node persists
154        ``status="pulling"`` and spawns a daemon thread; poll
155        :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing.
156        """
157        resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull")
158        _raise_for_node_status(resp)
159        body: dict[str, Any] = resp.json()
160        return body

Ask the node to pre-pull registry images for challenge_id.

Pull-side twin of build_challenge(): the node persists status="pulling" and spawns a daemon thread; poll get_pull_state() for the pulled / failed landing.

def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, typing.Any]:
162    def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
163        """Queue background pre-pull on the node.
164
165        With ``challenge_ids=None`` the node pulls every spec it knows;
166        with an explicit list only those (the platform scoping to a
167        competition). Returns ``{queued, skipped_pulled,
168        skipped_in_progress}``.
169        """
170        kwargs: dict[str, Any] = {}
171        if challenge_ids is not None:
172            kwargs["json"] = {"challenge_ids": challenge_ids}
173        resp = self.request("POST", "/admin/challenges/pull-all", **kwargs)
174        _raise_for_node_status(resp)
175        body: dict[str, Any] = resp.json()
176        return body

Queue background pre-pull on the node.

With challenge_ids=None the node pulls every spec it knows; with an explicit list only those (the platform scoping to a competition). Returns {queued, skipped_pulled, skipped_in_progress}.

def get_pull_state(self) -> dict[str, typing.Any]:
178    def get_pull_state(self) -> dict[str, Any]:
179        """Fetch every per-challenge pull-state row on this node.
180
181        Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``.
182        ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` /
183        ``failed``; ``unpulled`` placeholders are synthesised for specs
184        the node has seen but never been asked to pull.
185        """
186        resp = self.request("GET", "/admin/challenges/pull-state")
187        _raise_for_node_status(resp)
188        body: dict[str, Any] = resp.json()
189        return body

Fetch every per-challenge pull-state row on this node.

Returns {"rows": [{challenge_id, status, pulled_at, error}, …]}. status is one of unpulled / pulling / pulled / failed; unpulled placeholders are synthesised for specs the node has seen but never been asked to pull.

def get_status(self, instance_id: str) -> dict[str, typing.Any]:
193    def get_status(self, instance_id: str) -> dict[str, Any]:
194        resp = self.request("GET", f"/instances/{instance_id}/status")
195        resp.raise_for_status()
196        body: dict[str, Any] = resp.json()
197        return body
def check_health(self, instance_id: str) -> bool:
199    def check_health(self, instance_id: str) -> bool:
200        resp = self.request("GET", f"/instances/{instance_id}/health")
201        resp.raise_for_status()
202        return bool(resp.json().get("is_healthy"))
def node_health(self) -> dict[str, typing.Any]:
204    def node_health(self) -> dict[str, Any]:
205        """Liveness + ``{running, capacity}`` for heartbeat."""
206        resp = self.request("GET", "/health")
207        resp.raise_for_status()
208        body: dict[str, Any] = resp.json()
209        return body

Liveness + {running, capacity} for heartbeat.

def get_traffic(self, instance_id: str) -> dict[str, typing.Any]:
213    def get_traffic(self, instance_id: str) -> dict[str, Any]:
214        """Fetch mitmproxy flow data; file lives on the node's FS."""
215        resp = self.request("GET", f"/instances/{instance_id}/traffic")
216        resp.raise_for_status()
217        body: dict[str, Any] = resp.json()
218        return body

Fetch mitmproxy flow data; file lives on the node's FS.

def list_containers(self, instance_id: str) -> list[dict[str, typing.Any]]:
220    def list_containers(self, instance_id: str) -> list[dict[str, Any]]:
221        """Enumerate every container in an instance's compose project.
222
223        Backs the admin-shell feature: the platform forwards a
224        ``GET /admin/instances/{id}/containers`` to the assigned node,
225        which returns one row per container (challenge services and
226        platform-injected sidecars alike). The WebSocket reverse-proxy
227        is opened separately and does not go through ``NodeClient``.
228        """
229        resp = self.request("GET", f"/instances/{instance_id}/containers")
230        _raise_for_node_status(resp)
231        body: list[dict[str, Any]] = resp.json()
232        return body

Enumerate every container in an instance's compose project.

Backs the admin-shell feature: the platform forwards a GET /admin/instances/{id}/containers to the assigned node, which returns one row per container (challenge services and platform-injected sidecars alike). The WebSocket reverse-proxy is opened separately and does not go through NodeClient.

def get_container_logs(self, instance_id: str) -> str:
234    def get_container_logs(self, instance_id: str) -> str:
235        """Fetch combined container stdout/stderr for archival."""
236        resp = self.request("GET", f"/instances/{instance_id}/container-logs")
237        resp.raise_for_status()
238        return str(resp.json().get("logs") or "")

Fetch combined container stdout/stderr for archival.

def get_pcap(self, instance_id: str) -> bytes:
240    def get_pcap(self, instance_id: str) -> bytes:
241        """Fetch the raw tcpdump capture for *instance_id*.
242
243        Returns an empty ``bytes`` when no capture exists (sidecar
244        disabled, instance never reached the running state, etc.) so
245        callers can persist conditionally without try/except.
246        """
247        resp = self.request("GET", f"/instances/{instance_id}/pcap")
248        if resp.status_code == 404:
249            return b""
250        resp.raise_for_status()
251        return resp.content

Fetch the raw tcpdump capture for instance_id.

Returns an empty bytes when no capture exists (sidecar disabled, instance never reached the running state, etc.) so callers can persist conditionally without try/except.

def get_agent_runtime(self, instance_id: str) -> dict[str, typing.Any]:
253    def get_agent_runtime(self, instance_id: str) -> dict[str, Any]:
254        """Provider-agnostic runtime hints for attaching an agent (proxy URL,
255        CA PEM, optional Docker-specific names). Replaces the old
256        sandbox-network shape."""
257        resp = self.request("GET", f"/instances/{instance_id}/agent-runtime")
258        resp.raise_for_status()
259        body: dict[str, Any] = resp.json()
260        return body

Provider-agnostic runtime hints for attaching an agent (proxy URL, CA PEM, optional Docker-specific names). Replaces the old sandbox-network shape.

def list_instance_attachments(self, instance_id: str) -> dict[str, typing.Any]:
264    def list_instance_attachments(self, instance_id: str) -> dict[str, Any]:
265        """List per-instance attachments rendered into the node's workdir.
266
267        Returns the raw JSON dict — caller projects through
268        :class:`AttachmentList` for typing. Used by the platform's
269        per-instance attachment endpoint to surface team-specific
270        rendered file listings."""
271        resp = self.request("GET", f"/instances/{instance_id}/attachments")
272        resp.raise_for_status()
273        body: dict[str, Any] = resp.json()
274        return body

List per-instance attachments rendered into the node's workdir.

Returns the raw JSON dict — caller projects through AttachmentList for typing. Used by the platform's per-instance attachment endpoint to surface team-specific rendered file listings.

def get_openvpn_config(self, instance_id: str) -> bytes:
276    def get_openvpn_config(self, instance_id: str) -> bytes:
277        """Fetch the per-instance OpenVPN client config from the node.
278
279        The node reads ``/etc/openvpn/client.ovpn`` out of the instance's
280        ``openvpn`` service container (generated on first boot by the
281        ``ctfy/openvpn-base`` image's bootstrap). Returns the body
282        verbatim — the platform-side route adds the
283        ``Content-Disposition`` header before re-emitting to the player.
284
285        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
286        proxy route catches 404 and re-emits to the player, anything
287        else is treated as a 502.
288        """
289        resp = self.request("GET", f"/instances/{instance_id}/openvpn-config")
290        resp.raise_for_status()
291        return resp.content

Fetch the per-instance OpenVPN client config from the node.

The node reads /etc/openvpn/client.ovpn out of the instance's openvpn service container (generated on first boot by the ctfy/openvpn-base image's bootstrap). Returns the body verbatim — the platform-side route adds the Content-Disposition header before re-emitting to the player.

Raises httpx.HTTPStatusError on non-2xx — the platform's proxy route catches 404 and re-emits to the player, anything else is treated as a 502.

def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
293    def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
294        """Download one per-instance attachment as ``(bytes, content_type)``.
295
296        Buffers the whole body — attachments are bounded (typical case
297        is a 10 KB binary or a small text file). For huge captures the
298        caller should hand the player a CDN URL instead.
299
300        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
301        proxy route catches 404 and re-emits to the player, anything
302        else is a 502."""
303        resp = self.request(
304            "GET",
305            f"/instances/{instance_id}/attachments/{filename}",
306        )
307        resp.raise_for_status()
308        return resp.content, resp.headers.get("content-type", "application/octet-stream")

Download one per-instance attachment as (bytes, content_type).

Buffers the whole body — attachments are bounded (typical case is a 10 KB binary or a small text file). For huge captures the caller should hand the player a CDN URL instead.

Raises httpx.HTTPStatusError on non-2xx — the platform's proxy route catches 404 and re-emits to the player, anything else is a 502.