ctfy.sdk.resources.instances

client.instances — launch, control, status, attachments, traffic.

Flat /instances/{id} ops plus the per-competition scoped twins (*_scoped) for callers who play in more than one competition and must address an instance unambiguously.

  1"""``client.instances`` — launch, control, status, attachments, traffic.
  2
  3Flat ``/instances/{id}`` ops plus the per-competition scoped twins
  4(``*_scoped``) for callers who play in more than one competition and must
  5address an instance unambiguously.
  6"""
  7
  8from __future__ import annotations
  9
 10import builtins
 11from collections.abc import Iterator
 12from typing import Any
 13
 14from ctfy.sdk._helpers import (
 15    InstanceReadyResult,
 16    _extract_items,
 17    _poll_instance_ready,
 18    _raise_for_status,
 19)
 20from ctfy.sdk.base import BaseHttpClient
 21from ctfy.server.models import (
 22    AttachmentList,
 23    InstanceInfo,
 24    InstanceQuestionInfo,
 25    InstanceStatusResponse,
 26    RenewResponse,
 27    StartResponse,
 28)
 29
 30
 31class InstancesResource:
 32    """Start / inspect / tear down challenge instances."""
 33
 34    def __init__(self, http: BaseHttpClient) -> None:
 35        self._http = http
 36
 37    def start(
 38        self,
 39        challenge_id: str,
 40        ttl: int | None = None,
 41        *,
 42        competition_id: str = "",
 43        timeout: int = 300,
 44        poll_interval: float = 2.0,
 45        proxy_output_dir: str | None = None,
 46    ) -> InstanceReadyResult:
 47        """Start instance and wait until ready.
 48
 49        Returns InstanceReadyResult with attack surface + sandbox network info.
 50        The .surface attribute provides backward compatibility.
 51
 52        Args:
 53            challenge_id: Challenge ID (e.g., "XBOW-047").
 54            ttl: Per-instance TTL override in seconds. ``None`` (the
 55                default) delegates to the platform's
 56                ``default_instance_ttl_s`` setting (admin-tunable,
 57                currently 24h). Explicit values are clamped to the
 58                platform's ``max_instance_ttl_s``.
 59            competition_id: Which competition to spin the instance up
 60                under. Required when the calling user is on more than
 61                one team — agents tied to a single comp can leave it
 62                empty and the server picks the unambiguous team.
 63            timeout: Max seconds to wait for ready.
 64            poll_interval: Seconds between status polls.
 65            proxy_output_dir: Host path to store proxy traffic captures.
 66        """
 67        body: dict[str, Any] = {
 68            "challenge_id": challenge_id,
 69            "competition_id": competition_id,
 70        }
 71        if ttl is not None:
 72            body["ttl"] = ttl
 73        if proxy_output_dir:
 74            body["proxy_output_dir"] = proxy_output_dir
 75        resp = self._http.request("POST", "/instances", json=body)
 76        _raise_for_status(resp)
 77        start = StartResponse.model_validate(resp.json())
 78
 79        return _poll_instance_ready(
 80            self._http._client, start.id, timeout=timeout, poll_interval=poll_interval
 81        )
 82
 83    def get(self, instance_id: str) -> InstanceInfo:
 84        """Get the full ``InstanceInfo`` for one running instance.
 85
 86        Carries the per-instance ``questions`` list with each question's
 87        ``unlocked`` / ``answered_correctly`` / ``attempts_remaining``
 88        state — the surface multi-milestone solvers iterate on. The
 89        prompt is empty for still-locked questions so the route hint
 90        doesn't leak before the prerequisite is solved.
 91
 92        See :meth:`status` for the lighter status-only payload (no
 93        questions, no spec metadata) used by polling loops.
 94        """
 95        resp = self._http.request("GET", f"/instances/{instance_id}")
 96        _raise_for_status(resp)
 97        return InstanceInfo.model_validate(resp.json())
 98
 99    def iter_pending_questions(self, instance_id: str) -> Iterator[InstanceQuestionInfo]:
100        """Yield each currently-pending question on a running instance.
101
102        "Pending" = unlocked (the ``requires:`` chain is satisfied)
103        and not yet correctly answered by the calling team. After
104        every batch is exhausted, re-fetches ``InstanceInfo`` so
105        newly-unlocked questions (whose prerequisite was just solved
106        by the consumer) flow through on the next iteration.
107
108        Terminates on either:
109
110        * no pending questions left — the natural "challenge fully
111          solved" exit;
112        * **no progress** — the same set of pending question ids
113          comes back twice in a row, meaning the consumer's solver
114          isn't capturing anything. Stops silently rather than
115          infinite-looping; inspect the caller's last :meth:`get` to
116          see what's still stuck.
117
118        This is the ergonomic helper for the common multi-milestone
119        loop::
120
121            for q in client.instances.iter_pending_questions(instance_id):
122                ans = my_solver(q.prompt)
123                client.submissions.submit(instance_id, ans, question_id=q.id)
124
125        Args:
126            instance_id: The instance to walk.
127        """
128        previous_pending_ids: frozenset[str] | None = None
129        while True:
130            info = self.get(instance_id)
131            pending = [q for q in info.questions if q.unlocked and not q.answered_correctly]
132            if not pending:
133                return
134
135            pending_ids = frozenset(q.id for q in pending)
136            if pending_ids == previous_pending_ids:
137                # Solver made no progress against the last batch — stop
138                # rather than spin. The consumer can re-call this method
139                # later (after fixing their solver) to pick up where
140                # they left off.
141                return
142            previous_pending_ids = pending_ids
143
144            yield from pending
145
146    def status(self, instance_id: str) -> InstanceStatusResponse:
147        """Get instance status and attack surface."""
148        resp = self._http.request("GET", f"/instances/{instance_id}/status")
149        _raise_for_status(resp)
150        return InstanceStatusResponse.model_validate(resp.json())
151
152    def stop(self, instance_id: str) -> None:
153        """Tear down ``instance_id``. Idempotent: a 404 from the server
154        means the instance is already gone (auto-stopped after solve,
155        TTL expiry, admin teardown) and is treated as success."""
156        resp = self._http.request("DELETE", f"/instances/{instance_id}")
157        if resp.status_code != 404:
158            _raise_for_status(resp)
159
160    def renew(self, instance_id: str, ttl: int | None = None) -> RenewResponse:
161        """Extend the TTL of a running instance.
162
163        ``ttl=None`` (the default) delegates to the platform's
164        ``default_instance_ttl_s`` setting. Explicit values are
165        clamped to ``max_instance_ttl_s``.
166        """
167        params: dict[str, Any] = {}
168        if ttl is not None:
169            params["ttl"] = ttl
170        resp = self._http.request("POST", f"/instances/{instance_id}/renew", params=params)
171        _raise_for_status(resp)
172        return RenewResponse.model_validate(resp.json())
173
174    def list(
175        self,
176        challenge_id: str = "",
177        status: str = "",
178        q: str = "",
179        offset: int = 0,
180        limit: int = 50,
181    ) -> builtins.list[InstanceInfo]:
182        """List all running instances."""
183        params: dict[str, Any] = {"offset": offset, "limit": limit}
184        if challenge_id:
185            params["challenge_id"] = challenge_id
186        if status:
187            params["status"] = status
188        if q:
189            params["q"] = q
190        resp = self._http.request("GET", "/instances", params=params)
191        _raise_for_status(resp)
192        return _extract_items(resp.json(), InstanceInfo)
193
194    def attachments(self, instance_id: str) -> AttachmentList:
195        """List per-instance (post-launch) attachments.
196
197        For challenges with ``attachments/<name>.tpl`` Jinja templates
198        the names + sizes here are the *rendered* per-team-unique
199        siblings; for ones with only static attachments the list is
200        the same as :meth:`ChallengesResource.attachments`.
201        """
202        resp = self._http.request("GET", f"/instances/{instance_id}/attachments")
203        _raise_for_status(resp)
204        return AttachmentList.model_validate(resp.json())
205
206    def download_attachment(self, instance_id: str, filename: str) -> bytes:
207        """Download one per-instance attachment as raw bytes.
208
209        For challenges with per-team Jinja templates this returns the
210        team-specific render; for static attachments it's the same
211        bytes as :meth:`ChallengesResource.download_attachment` would yield.
212        """
213        resp = self._http.request("GET", f"/instances/{instance_id}/attachments/{filename}")
214        _raise_for_status(resp)
215        return resp.content
216
217    def openvpn_config(self, instance_id: str) -> bytes:
218        """Download the per-instance OpenVPN client config as raw bytes.
219
220        Only meaningful for challenges that declare
221        ``network_topology: engagement`` in metadata.yaml. The body is
222        the rendered ``.ovpn`` file the player or automated agent
223        passes to ``openvpn --config …`` to land on the challenge's
224        DMZ docker network. The platform serves a 404 for simple-mode
225        instances.
226
227        Raises a ``CTFyError`` (404) when the challenge isn't an
228        engagement-mode one, when the instance hasn't yet reached the
229        running state, or when the node's openvpn container hasn't
230        finished its PKI bootstrap.
231        """
232        resp = self._http.request("GET", f"/instances/{instance_id}/openvpn-config")
233        _raise_for_status(resp)
234        return resp.content
235
236    def traffic(self, instance_id: str, *, competition_id: str = "") -> dict[str, Any]:
237        """Parsed mitmproxy traffic for a running instance. The flow
238        file lives on the worker node; the platform proxies the fetch.
239
240        Returns an empty dict when no capture exists (sidecar disabled,
241        node offline, instance never reached running) so callers can
242        persist conditionally without try/except.
243        """
244        params = {"competition_id": competition_id} if competition_id else {}
245        resp = self._http.request("GET", f"/instances/{instance_id}/traffic", params=params)
246        _raise_for_status(resp)
247        data = resp.json()
248        return data if isinstance(data, dict) else {}
class InstancesResource:
 32class InstancesResource:
 33    """Start / inspect / tear down challenge instances."""
 34
 35    def __init__(self, http: BaseHttpClient) -> None:
 36        self._http = http
 37
 38    def start(
 39        self,
 40        challenge_id: str,
 41        ttl: int | None = None,
 42        *,
 43        competition_id: str = "",
 44        timeout: int = 300,
 45        poll_interval: float = 2.0,
 46        proxy_output_dir: str | None = None,
 47    ) -> InstanceReadyResult:
 48        """Start instance and wait until ready.
 49
 50        Returns InstanceReadyResult with attack surface + sandbox network info.
 51        The .surface attribute provides backward compatibility.
 52
 53        Args:
 54            challenge_id: Challenge ID (e.g., "XBOW-047").
 55            ttl: Per-instance TTL override in seconds. ``None`` (the
 56                default) delegates to the platform's
 57                ``default_instance_ttl_s`` setting (admin-tunable,
 58                currently 24h). Explicit values are clamped to the
 59                platform's ``max_instance_ttl_s``.
 60            competition_id: Which competition to spin the instance up
 61                under. Required when the calling user is on more than
 62                one team — agents tied to a single comp can leave it
 63                empty and the server picks the unambiguous team.
 64            timeout: Max seconds to wait for ready.
 65            poll_interval: Seconds between status polls.
 66            proxy_output_dir: Host path to store proxy traffic captures.
 67        """
 68        body: dict[str, Any] = {
 69            "challenge_id": challenge_id,
 70            "competition_id": competition_id,
 71        }
 72        if ttl is not None:
 73            body["ttl"] = ttl
 74        if proxy_output_dir:
 75            body["proxy_output_dir"] = proxy_output_dir
 76        resp = self._http.request("POST", "/instances", json=body)
 77        _raise_for_status(resp)
 78        start = StartResponse.model_validate(resp.json())
 79
 80        return _poll_instance_ready(
 81            self._http._client, start.id, timeout=timeout, poll_interval=poll_interval
 82        )
 83
 84    def get(self, instance_id: str) -> InstanceInfo:
 85        """Get the full ``InstanceInfo`` for one running instance.
 86
 87        Carries the per-instance ``questions`` list with each question's
 88        ``unlocked`` / ``answered_correctly`` / ``attempts_remaining``
 89        state — the surface multi-milestone solvers iterate on. The
 90        prompt is empty for still-locked questions so the route hint
 91        doesn't leak before the prerequisite is solved.
 92
 93        See :meth:`status` for the lighter status-only payload (no
 94        questions, no spec metadata) used by polling loops.
 95        """
 96        resp = self._http.request("GET", f"/instances/{instance_id}")
 97        _raise_for_status(resp)
 98        return InstanceInfo.model_validate(resp.json())
 99
100    def iter_pending_questions(self, instance_id: str) -> Iterator[InstanceQuestionInfo]:
101        """Yield each currently-pending question on a running instance.
102
103        "Pending" = unlocked (the ``requires:`` chain is satisfied)
104        and not yet correctly answered by the calling team. After
105        every batch is exhausted, re-fetches ``InstanceInfo`` so
106        newly-unlocked questions (whose prerequisite was just solved
107        by the consumer) flow through on the next iteration.
108
109        Terminates on either:
110
111        * no pending questions left — the natural "challenge fully
112          solved" exit;
113        * **no progress** — the same set of pending question ids
114          comes back twice in a row, meaning the consumer's solver
115          isn't capturing anything. Stops silently rather than
116          infinite-looping; inspect the caller's last :meth:`get` to
117          see what's still stuck.
118
119        This is the ergonomic helper for the common multi-milestone
120        loop::
121
122            for q in client.instances.iter_pending_questions(instance_id):
123                ans = my_solver(q.prompt)
124                client.submissions.submit(instance_id, ans, question_id=q.id)
125
126        Args:
127            instance_id: The instance to walk.
128        """
129        previous_pending_ids: frozenset[str] | None = None
130        while True:
131            info = self.get(instance_id)
132            pending = [q for q in info.questions if q.unlocked and not q.answered_correctly]
133            if not pending:
134                return
135
136            pending_ids = frozenset(q.id for q in pending)
137            if pending_ids == previous_pending_ids:
138                # Solver made no progress against the last batch — stop
139                # rather than spin. The consumer can re-call this method
140                # later (after fixing their solver) to pick up where
141                # they left off.
142                return
143            previous_pending_ids = pending_ids
144
145            yield from pending
146
147    def status(self, instance_id: str) -> InstanceStatusResponse:
148        """Get instance status and attack surface."""
149        resp = self._http.request("GET", f"/instances/{instance_id}/status")
150        _raise_for_status(resp)
151        return InstanceStatusResponse.model_validate(resp.json())
152
153    def stop(self, instance_id: str) -> None:
154        """Tear down ``instance_id``. Idempotent: a 404 from the server
155        means the instance is already gone (auto-stopped after solve,
156        TTL expiry, admin teardown) and is treated as success."""
157        resp = self._http.request("DELETE", f"/instances/{instance_id}")
158        if resp.status_code != 404:
159            _raise_for_status(resp)
160
161    def renew(self, instance_id: str, ttl: int | None = None) -> RenewResponse:
162        """Extend the TTL of a running instance.
163
164        ``ttl=None`` (the default) delegates to the platform's
165        ``default_instance_ttl_s`` setting. Explicit values are
166        clamped to ``max_instance_ttl_s``.
167        """
168        params: dict[str, Any] = {}
169        if ttl is not None:
170            params["ttl"] = ttl
171        resp = self._http.request("POST", f"/instances/{instance_id}/renew", params=params)
172        _raise_for_status(resp)
173        return RenewResponse.model_validate(resp.json())
174
175    def list(
176        self,
177        challenge_id: str = "",
178        status: str = "",
179        q: str = "",
180        offset: int = 0,
181        limit: int = 50,
182    ) -> builtins.list[InstanceInfo]:
183        """List all running instances."""
184        params: dict[str, Any] = {"offset": offset, "limit": limit}
185        if challenge_id:
186            params["challenge_id"] = challenge_id
187        if status:
188            params["status"] = status
189        if q:
190            params["q"] = q
191        resp = self._http.request("GET", "/instances", params=params)
192        _raise_for_status(resp)
193        return _extract_items(resp.json(), InstanceInfo)
194
195    def attachments(self, instance_id: str) -> AttachmentList:
196        """List per-instance (post-launch) attachments.
197
198        For challenges with ``attachments/<name>.tpl`` Jinja templates
199        the names + sizes here are the *rendered* per-team-unique
200        siblings; for ones with only static attachments the list is
201        the same as :meth:`ChallengesResource.attachments`.
202        """
203        resp = self._http.request("GET", f"/instances/{instance_id}/attachments")
204        _raise_for_status(resp)
205        return AttachmentList.model_validate(resp.json())
206
207    def download_attachment(self, instance_id: str, filename: str) -> bytes:
208        """Download one per-instance attachment as raw bytes.
209
210        For challenges with per-team Jinja templates this returns the
211        team-specific render; for static attachments it's the same
212        bytes as :meth:`ChallengesResource.download_attachment` would yield.
213        """
214        resp = self._http.request("GET", f"/instances/{instance_id}/attachments/{filename}")
215        _raise_for_status(resp)
216        return resp.content
217
218    def openvpn_config(self, instance_id: str) -> bytes:
219        """Download the per-instance OpenVPN client config as raw bytes.
220
221        Only meaningful for challenges that declare
222        ``network_topology: engagement`` in metadata.yaml. The body is
223        the rendered ``.ovpn`` file the player or automated agent
224        passes to ``openvpn --config …`` to land on the challenge's
225        DMZ docker network. The platform serves a 404 for simple-mode
226        instances.
227
228        Raises a ``CTFyError`` (404) when the challenge isn't an
229        engagement-mode one, when the instance hasn't yet reached the
230        running state, or when the node's openvpn container hasn't
231        finished its PKI bootstrap.
232        """
233        resp = self._http.request("GET", f"/instances/{instance_id}/openvpn-config")
234        _raise_for_status(resp)
235        return resp.content
236
237    def traffic(self, instance_id: str, *, competition_id: str = "") -> dict[str, Any]:
238        """Parsed mitmproxy traffic for a running instance. The flow
239        file lives on the worker node; the platform proxies the fetch.
240
241        Returns an empty dict when no capture exists (sidecar disabled,
242        node offline, instance never reached running) so callers can
243        persist conditionally without try/except.
244        """
245        params = {"competition_id": competition_id} if competition_id else {}
246        resp = self._http.request("GET", f"/instances/{instance_id}/traffic", params=params)
247        _raise_for_status(resp)
248        data = resp.json()
249        return data if isinstance(data, dict) else {}

Start / inspect / tear down challenge instances.

InstancesResource(http: ctfy.sdk.base.BaseHttpClient)
35    def __init__(self, http: BaseHttpClient) -> None:
36        self._http = http
def start( self, challenge_id: str, ttl: int | None = None, *, competition_id: str = '', timeout: int = 300, poll_interval: float = 2.0, proxy_output_dir: str | None = None) -> ctfy.sdk._helpers.InstanceReadyResult:
38    def start(
39        self,
40        challenge_id: str,
41        ttl: int | None = None,
42        *,
43        competition_id: str = "",
44        timeout: int = 300,
45        poll_interval: float = 2.0,
46        proxy_output_dir: str | None = None,
47    ) -> InstanceReadyResult:
48        """Start instance and wait until ready.
49
50        Returns InstanceReadyResult with attack surface + sandbox network info.
51        The .surface attribute provides backward compatibility.
52
53        Args:
54            challenge_id: Challenge ID (e.g., "XBOW-047").
55            ttl: Per-instance TTL override in seconds. ``None`` (the
56                default) delegates to the platform's
57                ``default_instance_ttl_s`` setting (admin-tunable,
58                currently 24h). Explicit values are clamped to the
59                platform's ``max_instance_ttl_s``.
60            competition_id: Which competition to spin the instance up
61                under. Required when the calling user is on more than
62                one team — agents tied to a single comp can leave it
63                empty and the server picks the unambiguous team.
64            timeout: Max seconds to wait for ready.
65            poll_interval: Seconds between status polls.
66            proxy_output_dir: Host path to store proxy traffic captures.
67        """
68        body: dict[str, Any] = {
69            "challenge_id": challenge_id,
70            "competition_id": competition_id,
71        }
72        if ttl is not None:
73            body["ttl"] = ttl
74        if proxy_output_dir:
75            body["proxy_output_dir"] = proxy_output_dir
76        resp = self._http.request("POST", "/instances", json=body)
77        _raise_for_status(resp)
78        start = StartResponse.model_validate(resp.json())
79
80        return _poll_instance_ready(
81            self._http._client, start.id, timeout=timeout, poll_interval=poll_interval
82        )

Start instance and wait until ready.

Returns InstanceReadyResult with attack surface + sandbox network info. The .surface attribute provides backward compatibility.

Arguments:
  • challenge_id: Challenge ID (e.g., "XBOW-047").
  • ttl: Per-instance TTL override in seconds. None (the default) delegates to the platform's default_instance_ttl_s setting (admin-tunable, currently 24h). Explicit values are clamped to the platform's max_instance_ttl_s.
  • competition_id: Which competition to spin the instance up under. Required when the calling user is on more than one team — agents tied to a single comp can leave it empty and the server picks the unambiguous team.
  • timeout: Max seconds to wait for ready.
  • poll_interval: Seconds between status polls.
  • proxy_output_dir: Host path to store proxy traffic captures.
def get(self, instance_id: str) -> ctfy.server.models.InstanceInfo:
84    def get(self, instance_id: str) -> InstanceInfo:
85        """Get the full ``InstanceInfo`` for one running instance.
86
87        Carries the per-instance ``questions`` list with each question's
88        ``unlocked`` / ``answered_correctly`` / ``attempts_remaining``
89        state — the surface multi-milestone solvers iterate on. The
90        prompt is empty for still-locked questions so the route hint
91        doesn't leak before the prerequisite is solved.
92
93        See :meth:`status` for the lighter status-only payload (no
94        questions, no spec metadata) used by polling loops.
95        """
96        resp = self._http.request("GET", f"/instances/{instance_id}")
97        _raise_for_status(resp)
98        return InstanceInfo.model_validate(resp.json())

Get the full InstanceInfo for one running instance.

Carries the per-instance questions list with each question's unlocked / answered_correctly / attempts_remaining state — the surface multi-milestone solvers iterate on. The prompt is empty for still-locked questions so the route hint doesn't leak before the prerequisite is solved.

See status() for the lighter status-only payload (no questions, no spec metadata) used by polling loops.

def iter_pending_questions( self, instance_id: str) -> Iterator[ctfy.server.models.InstanceQuestionInfo]:
100    def iter_pending_questions(self, instance_id: str) -> Iterator[InstanceQuestionInfo]:
101        """Yield each currently-pending question on a running instance.
102
103        "Pending" = unlocked (the ``requires:`` chain is satisfied)
104        and not yet correctly answered by the calling team. After
105        every batch is exhausted, re-fetches ``InstanceInfo`` so
106        newly-unlocked questions (whose prerequisite was just solved
107        by the consumer) flow through on the next iteration.
108
109        Terminates on either:
110
111        * no pending questions left — the natural "challenge fully
112          solved" exit;
113        * **no progress** — the same set of pending question ids
114          comes back twice in a row, meaning the consumer's solver
115          isn't capturing anything. Stops silently rather than
116          infinite-looping; inspect the caller's last :meth:`get` to
117          see what's still stuck.
118
119        This is the ergonomic helper for the common multi-milestone
120        loop::
121
122            for q in client.instances.iter_pending_questions(instance_id):
123                ans = my_solver(q.prompt)
124                client.submissions.submit(instance_id, ans, question_id=q.id)
125
126        Args:
127            instance_id: The instance to walk.
128        """
129        previous_pending_ids: frozenset[str] | None = None
130        while True:
131            info = self.get(instance_id)
132            pending = [q for q in info.questions if q.unlocked and not q.answered_correctly]
133            if not pending:
134                return
135
136            pending_ids = frozenset(q.id for q in pending)
137            if pending_ids == previous_pending_ids:
138                # Solver made no progress against the last batch — stop
139                # rather than spin. The consumer can re-call this method
140                # later (after fixing their solver) to pick up where
141                # they left off.
142                return
143            previous_pending_ids = pending_ids
144
145            yield from pending

Yield each currently-pending question on a running instance.

"Pending" = unlocked (the requires: chain is satisfied) and not yet correctly answered by the calling team. After every batch is exhausted, re-fetches InstanceInfo so newly-unlocked questions (whose prerequisite was just solved by the consumer) flow through on the next iteration.

Terminates on either:

  • no pending questions left — the natural "challenge fully solved" exit;
  • no progress — the same set of pending question ids comes back twice in a row, meaning the consumer's solver isn't capturing anything. Stops silently rather than infinite-looping; inspect the caller's last get() to see what's still stuck.

This is the ergonomic helper for the common multi-milestone loop::

for q in client.instances.iter_pending_questions(instance_id):
    ans = my_solver(q.prompt)
    client.submissions.submit(instance_id, ans, question_id=q.id)
Arguments:
  • instance_id: The instance to walk.
def status( self, instance_id: str) -> ctfy.server.models.InstanceStatusResponse:
147    def status(self, instance_id: str) -> InstanceStatusResponse:
148        """Get instance status and attack surface."""
149        resp = self._http.request("GET", f"/instances/{instance_id}/status")
150        _raise_for_status(resp)
151        return InstanceStatusResponse.model_validate(resp.json())

Get instance status and attack surface.

def stop(self, instance_id: str) -> None:
153    def stop(self, instance_id: str) -> None:
154        """Tear down ``instance_id``. Idempotent: a 404 from the server
155        means the instance is already gone (auto-stopped after solve,
156        TTL expiry, admin teardown) and is treated as success."""
157        resp = self._http.request("DELETE", f"/instances/{instance_id}")
158        if resp.status_code != 404:
159            _raise_for_status(resp)

Tear down instance_id. Idempotent: a 404 from the server means the instance is already gone (auto-stopped after solve, TTL expiry, admin teardown) and is treated as success.

def renew( self, instance_id: str, ttl: int | None = None) -> ctfy.server.models.RenewResponse:
161    def renew(self, instance_id: str, ttl: int | None = None) -> RenewResponse:
162        """Extend the TTL of a running instance.
163
164        ``ttl=None`` (the default) delegates to the platform's
165        ``default_instance_ttl_s`` setting. Explicit values are
166        clamped to ``max_instance_ttl_s``.
167        """
168        params: dict[str, Any] = {}
169        if ttl is not None:
170            params["ttl"] = ttl
171        resp = self._http.request("POST", f"/instances/{instance_id}/renew", params=params)
172        _raise_for_status(resp)
173        return RenewResponse.model_validate(resp.json())

Extend the TTL of a running instance.

ttl=None (the default) delegates to the platform's default_instance_ttl_s setting. Explicit values are clamped to max_instance_ttl_s.

def list( self, challenge_id: str = '', status: str = '', q: str = '', offset: int = 0, limit: int = 50) -> list[ctfy.server.models.InstanceInfo]:
175    def list(
176        self,
177        challenge_id: str = "",
178        status: str = "",
179        q: str = "",
180        offset: int = 0,
181        limit: int = 50,
182    ) -> builtins.list[InstanceInfo]:
183        """List all running instances."""
184        params: dict[str, Any] = {"offset": offset, "limit": limit}
185        if challenge_id:
186            params["challenge_id"] = challenge_id
187        if status:
188            params["status"] = status
189        if q:
190            params["q"] = q
191        resp = self._http.request("GET", "/instances", params=params)
192        _raise_for_status(resp)
193        return _extract_items(resp.json(), InstanceInfo)

List all running instances.

def attachments(self, instance_id: str) -> ctfy.server.models.AttachmentList:
195    def attachments(self, instance_id: str) -> AttachmentList:
196        """List per-instance (post-launch) attachments.
197
198        For challenges with ``attachments/<name>.tpl`` Jinja templates
199        the names + sizes here are the *rendered* per-team-unique
200        siblings; for ones with only static attachments the list is
201        the same as :meth:`ChallengesResource.attachments`.
202        """
203        resp = self._http.request("GET", f"/instances/{instance_id}/attachments")
204        _raise_for_status(resp)
205        return AttachmentList.model_validate(resp.json())

List per-instance (post-launch) attachments.

For challenges with attachments/<name>.tpl Jinja templates the names + sizes here are the rendered per-team-unique siblings; for ones with only static attachments the list is the same as ChallengesResource.attachments().

def download_attachment(self, instance_id: str, filename: str) -> bytes:
207    def download_attachment(self, instance_id: str, filename: str) -> bytes:
208        """Download one per-instance attachment as raw bytes.
209
210        For challenges with per-team Jinja templates this returns the
211        team-specific render; for static attachments it's the same
212        bytes as :meth:`ChallengesResource.download_attachment` would yield.
213        """
214        resp = self._http.request("GET", f"/instances/{instance_id}/attachments/{filename}")
215        _raise_for_status(resp)
216        return resp.content

Download one per-instance attachment as raw bytes.

For challenges with per-team Jinja templates this returns the team-specific render; for static attachments it's the same bytes as ChallengesResource.download_attachment() would yield.

def openvpn_config(self, instance_id: str) -> bytes:
218    def openvpn_config(self, instance_id: str) -> bytes:
219        """Download the per-instance OpenVPN client config as raw bytes.
220
221        Only meaningful for challenges that declare
222        ``network_topology: engagement`` in metadata.yaml. The body is
223        the rendered ``.ovpn`` file the player or automated agent
224        passes to ``openvpn --config …`` to land on the challenge's
225        DMZ docker network. The platform serves a 404 for simple-mode
226        instances.
227
228        Raises a ``CTFyError`` (404) when the challenge isn't an
229        engagement-mode one, when the instance hasn't yet reached the
230        running state, or when the node's openvpn container hasn't
231        finished its PKI bootstrap.
232        """
233        resp = self._http.request("GET", f"/instances/{instance_id}/openvpn-config")
234        _raise_for_status(resp)
235        return resp.content

Download the per-instance OpenVPN client config as raw bytes.

Only meaningful for challenges that declare network_topology: engagement in metadata.yaml. The body is the rendered .ovpn file the player or automated agent passes to openvpn --config … to land on the challenge's DMZ docker network. The platform serves a 404 for simple-mode instances.

Raises a CTFyError (404) when the challenge isn't an engagement-mode one, when the instance hasn't yet reached the running state, or when the node's openvpn container hasn't finished its PKI bootstrap.

def traffic( self, instance_id: str, *, competition_id: str = '') -> dict[str, typing.Any]:
237    def traffic(self, instance_id: str, *, competition_id: str = "") -> dict[str, Any]:
238        """Parsed mitmproxy traffic for a running instance. The flow
239        file lives on the worker node; the platform proxies the fetch.
240
241        Returns an empty dict when no capture exists (sidecar disabled,
242        node offline, instance never reached running) so callers can
243        persist conditionally without try/except.
244        """
245        params = {"competition_id": competition_id} if competition_id else {}
246        resp = self._http.request("GET", f"/instances/{instance_id}/traffic", params=params)
247        _raise_for_status(resp)
248        data = resp.json()
249        return data if isinstance(data, dict) else {}

Parsed mitmproxy traffic for a running instance. The flow file lives on the worker node; the platform proxies the fetch.

Returns an empty dict when no capture exists (sidecar disabled, node offline, instance never reached running) so callers can persist conditionally without try/except.