ctfy.sdk.resources.instances
client.instances — launch, control, status, attachments, traffic.
Flat /instances/{id} ops plus the per-competition scoped twins
(*_scoped) for callers who play in more than one competition and must
address an instance unambiguously.
1"""``client.instances`` — launch, control, status, attachments, traffic. 2 3Flat ``/instances/{id}`` ops plus the per-competition scoped twins 4(``*_scoped``) for callers who play in more than one competition and must 5address an instance unambiguously. 6""" 7 8from __future__ import annotations 9 10import builtins 11from collections.abc import Iterator 12from typing import Any 13 14from ctfy.sdk._helpers import ( 15 InstanceReadyResult, 16 _extract_items, 17 _poll_instance_ready, 18 _raise_for_status, 19) 20from ctfy.sdk.base import BaseHttpClient 21from ctfy.server.models import ( 22 AttachmentList, 23 InstanceInfo, 24 InstanceQuestionInfo, 25 InstanceStatusResponse, 26 RenewResponse, 27 StartResponse, 28) 29 30 31class InstancesResource: 32 """Start / inspect / tear down challenge instances.""" 33 34 def __init__(self, http: BaseHttpClient) -> None: 35 self._http = http 36 37 def start( 38 self, 39 challenge_id: str, 40 ttl: int | None = None, 41 *, 42 competition_id: str = "", 43 timeout: int = 300, 44 poll_interval: float = 2.0, 45 proxy_output_dir: str | None = None, 46 ) -> InstanceReadyResult: 47 """Start instance and wait until ready. 48 49 Returns InstanceReadyResult with attack surface + sandbox network info. 50 The .surface attribute provides backward compatibility. 51 52 Args: 53 challenge_id: Challenge ID (e.g., "XBOW-047"). 54 ttl: Per-instance TTL override in seconds. ``None`` (the 55 default) delegates to the platform's 56 ``default_instance_ttl_s`` setting (admin-tunable, 57 currently 24h). Explicit values are clamped to the 58 platform's ``max_instance_ttl_s``. 59 competition_id: Which competition to spin the instance up 60 under. Required when the calling user is on more than 61 one team — agents tied to a single comp can leave it 62 empty and the server picks the unambiguous team. 63 timeout: Max seconds to wait for ready. 64 poll_interval: Seconds between status polls. 65 proxy_output_dir: Host path to store proxy traffic captures. 66 """ 67 body: dict[str, Any] = { 68 "challenge_id": challenge_id, 69 "competition_id": competition_id, 70 } 71 if ttl is not None: 72 body["ttl"] = ttl 73 if proxy_output_dir: 74 body["proxy_output_dir"] = proxy_output_dir 75 resp = self._http.request("POST", "/instances", json=body) 76 _raise_for_status(resp) 77 start = StartResponse.model_validate(resp.json()) 78 79 return _poll_instance_ready( 80 self._http._client, start.id, timeout=timeout, poll_interval=poll_interval 81 ) 82 83 def get(self, instance_id: str) -> InstanceInfo: 84 """Get the full ``InstanceInfo`` for one running instance. 85 86 Carries the per-instance ``questions`` list with each question's 87 ``unlocked`` / ``answered_correctly`` / ``attempts_remaining`` 88 state — the surface multi-milestone solvers iterate on. The 89 prompt is empty for still-locked questions so the route hint 90 doesn't leak before the prerequisite is solved. 91 92 See :meth:`status` for the lighter status-only payload (no 93 questions, no spec metadata) used by polling loops. 94 """ 95 resp = self._http.request("GET", f"/instances/{instance_id}") 96 _raise_for_status(resp) 97 return InstanceInfo.model_validate(resp.json()) 98 99 def iter_pending_questions(self, instance_id: str) -> Iterator[InstanceQuestionInfo]: 100 """Yield each currently-pending question on a running instance. 101 102 "Pending" = unlocked (the ``requires:`` chain is satisfied) 103 and not yet correctly answered by the calling team. After 104 every batch is exhausted, re-fetches ``InstanceInfo`` so 105 newly-unlocked questions (whose prerequisite was just solved 106 by the consumer) flow through on the next iteration. 107 108 Terminates on either: 109 110 * no pending questions left — the natural "challenge fully 111 solved" exit; 112 * **no progress** — the same set of pending question ids 113 comes back twice in a row, meaning the consumer's solver 114 isn't capturing anything. Stops silently rather than 115 infinite-looping; inspect the caller's last :meth:`get` to 116 see what's still stuck. 117 118 This is the ergonomic helper for the common multi-milestone 119 loop:: 120 121 for q in client.instances.iter_pending_questions(instance_id): 122 ans = my_solver(q.prompt) 123 client.submissions.submit(instance_id, ans, question_id=q.id) 124 125 Args: 126 instance_id: The instance to walk. 127 """ 128 previous_pending_ids: frozenset[str] | None = None 129 while True: 130 info = self.get(instance_id) 131 pending = [q for q in info.questions if q.unlocked and not q.answered_correctly] 132 if not pending: 133 return 134 135 pending_ids = frozenset(q.id for q in pending) 136 if pending_ids == previous_pending_ids: 137 # Solver made no progress against the last batch — stop 138 # rather than spin. The consumer can re-call this method 139 # later (after fixing their solver) to pick up where 140 # they left off. 141 return 142 previous_pending_ids = pending_ids 143 144 yield from pending 145 146 def status(self, instance_id: str) -> InstanceStatusResponse: 147 """Get instance status and attack surface.""" 148 resp = self._http.request("GET", f"/instances/{instance_id}/status") 149 _raise_for_status(resp) 150 return InstanceStatusResponse.model_validate(resp.json()) 151 152 def stop(self, instance_id: str) -> None: 153 """Tear down ``instance_id``. Idempotent: a 404 from the server 154 means the instance is already gone (auto-stopped after solve, 155 TTL expiry, admin teardown) and is treated as success.""" 156 resp = self._http.request("DELETE", f"/instances/{instance_id}") 157 if resp.status_code != 404: 158 _raise_for_status(resp) 159 160 def renew(self, instance_id: str, ttl: int | None = None) -> RenewResponse: 161 """Extend the TTL of a running instance. 162 163 ``ttl=None`` (the default) delegates to the platform's 164 ``default_instance_ttl_s`` setting. Explicit values are 165 clamped to ``max_instance_ttl_s``. 166 """ 167 params: dict[str, Any] = {} 168 if ttl is not None: 169 params["ttl"] = ttl 170 resp = self._http.request("POST", f"/instances/{instance_id}/renew", params=params) 171 _raise_for_status(resp) 172 return RenewResponse.model_validate(resp.json()) 173 174 def list( 175 self, 176 challenge_id: str = "", 177 status: str = "", 178 q: str = "", 179 offset: int = 0, 180 limit: int = 50, 181 ) -> builtins.list[InstanceInfo]: 182 """List all running instances.""" 183 params: dict[str, Any] = {"offset": offset, "limit": limit} 184 if challenge_id: 185 params["challenge_id"] = challenge_id 186 if status: 187 params["status"] = status 188 if q: 189 params["q"] = q 190 resp = self._http.request("GET", "/instances", params=params) 191 _raise_for_status(resp) 192 return _extract_items(resp.json(), InstanceInfo) 193 194 def attachments(self, instance_id: str) -> AttachmentList: 195 """List per-instance (post-launch) attachments. 196 197 For challenges with ``attachments/<name>.tpl`` Jinja templates 198 the names + sizes here are the *rendered* per-team-unique 199 siblings; for ones with only static attachments the list is 200 the same as :meth:`ChallengesResource.attachments`. 201 """ 202 resp = self._http.request("GET", f"/instances/{instance_id}/attachments") 203 _raise_for_status(resp) 204 return AttachmentList.model_validate(resp.json()) 205 206 def download_attachment(self, instance_id: str, filename: str) -> bytes: 207 """Download one per-instance attachment as raw bytes. 208 209 For challenges with per-team Jinja templates this returns the 210 team-specific render; for static attachments it's the same 211 bytes as :meth:`ChallengesResource.download_attachment` would yield. 212 """ 213 resp = self._http.request("GET", f"/instances/{instance_id}/attachments/{filename}") 214 _raise_for_status(resp) 215 return resp.content 216 217 def openvpn_config(self, instance_id: str) -> bytes: 218 """Download the per-instance OpenVPN client config as raw bytes. 219 220 Only meaningful for challenges that declare 221 ``network_topology: engagement`` in metadata.yaml. The body is 222 the rendered ``.ovpn`` file the player or automated agent 223 passes to ``openvpn --config …`` to land on the challenge's 224 DMZ docker network. The platform serves a 404 for simple-mode 225 instances. 226 227 Raises a ``CTFyError`` (404) when the challenge isn't an 228 engagement-mode one, when the instance hasn't yet reached the 229 running state, or when the node's openvpn container hasn't 230 finished its PKI bootstrap. 231 """ 232 resp = self._http.request("GET", f"/instances/{instance_id}/openvpn-config") 233 _raise_for_status(resp) 234 return resp.content 235 236 def traffic(self, instance_id: str, *, competition_id: str = "") -> dict[str, Any]: 237 """Parsed mitmproxy traffic for a running instance. The flow 238 file lives on the worker node; the platform proxies the fetch. 239 240 Returns an empty dict when no capture exists (sidecar disabled, 241 node offline, instance never reached running) so callers can 242 persist conditionally without try/except. 243 """ 244 params = {"competition_id": competition_id} if competition_id else {} 245 resp = self._http.request("GET", f"/instances/{instance_id}/traffic", params=params) 246 _raise_for_status(resp) 247 data = resp.json() 248 return data if isinstance(data, dict) else {}
32class InstancesResource: 33 """Start / inspect / tear down challenge instances.""" 34 35 def __init__(self, http: BaseHttpClient) -> None: 36 self._http = http 37 38 def start( 39 self, 40 challenge_id: str, 41 ttl: int | None = None, 42 *, 43 competition_id: str = "", 44 timeout: int = 300, 45 poll_interval: float = 2.0, 46 proxy_output_dir: str | None = None, 47 ) -> InstanceReadyResult: 48 """Start instance and wait until ready. 49 50 Returns InstanceReadyResult with attack surface + sandbox network info. 51 The .surface attribute provides backward compatibility. 52 53 Args: 54 challenge_id: Challenge ID (e.g., "XBOW-047"). 55 ttl: Per-instance TTL override in seconds. ``None`` (the 56 default) delegates to the platform's 57 ``default_instance_ttl_s`` setting (admin-tunable, 58 currently 24h). Explicit values are clamped to the 59 platform's ``max_instance_ttl_s``. 60 competition_id: Which competition to spin the instance up 61 under. Required when the calling user is on more than 62 one team — agents tied to a single comp can leave it 63 empty and the server picks the unambiguous team. 64 timeout: Max seconds to wait for ready. 65 poll_interval: Seconds between status polls. 66 proxy_output_dir: Host path to store proxy traffic captures. 67 """ 68 body: dict[str, Any] = { 69 "challenge_id": challenge_id, 70 "competition_id": competition_id, 71 } 72 if ttl is not None: 73 body["ttl"] = ttl 74 if proxy_output_dir: 75 body["proxy_output_dir"] = proxy_output_dir 76 resp = self._http.request("POST", "/instances", json=body) 77 _raise_for_status(resp) 78 start = StartResponse.model_validate(resp.json()) 79 80 return _poll_instance_ready( 81 self._http._client, start.id, timeout=timeout, poll_interval=poll_interval 82 ) 83 84 def get(self, instance_id: str) -> InstanceInfo: 85 """Get the full ``InstanceInfo`` for one running instance. 86 87 Carries the per-instance ``questions`` list with each question's 88 ``unlocked`` / ``answered_correctly`` / ``attempts_remaining`` 89 state — the surface multi-milestone solvers iterate on. The 90 prompt is empty for still-locked questions so the route hint 91 doesn't leak before the prerequisite is solved. 92 93 See :meth:`status` for the lighter status-only payload (no 94 questions, no spec metadata) used by polling loops. 95 """ 96 resp = self._http.request("GET", f"/instances/{instance_id}") 97 _raise_for_status(resp) 98 return InstanceInfo.model_validate(resp.json()) 99 100 def iter_pending_questions(self, instance_id: str) -> Iterator[InstanceQuestionInfo]: 101 """Yield each currently-pending question on a running instance. 102 103 "Pending" = unlocked (the ``requires:`` chain is satisfied) 104 and not yet correctly answered by the calling team. After 105 every batch is exhausted, re-fetches ``InstanceInfo`` so 106 newly-unlocked questions (whose prerequisite was just solved 107 by the consumer) flow through on the next iteration. 108 109 Terminates on either: 110 111 * no pending questions left — the natural "challenge fully 112 solved" exit; 113 * **no progress** — the same set of pending question ids 114 comes back twice in a row, meaning the consumer's solver 115 isn't capturing anything. Stops silently rather than 116 infinite-looping; inspect the caller's last :meth:`get` to 117 see what's still stuck. 118 119 This is the ergonomic helper for the common multi-milestone 120 loop:: 121 122 for q in client.instances.iter_pending_questions(instance_id): 123 ans = my_solver(q.prompt) 124 client.submissions.submit(instance_id, ans, question_id=q.id) 125 126 Args: 127 instance_id: The instance to walk. 128 """ 129 previous_pending_ids: frozenset[str] | None = None 130 while True: 131 info = self.get(instance_id) 132 pending = [q for q in info.questions if q.unlocked and not q.answered_correctly] 133 if not pending: 134 return 135 136 pending_ids = frozenset(q.id for q in pending) 137 if pending_ids == previous_pending_ids: 138 # Solver made no progress against the last batch — stop 139 # rather than spin. The consumer can re-call this method 140 # later (after fixing their solver) to pick up where 141 # they left off. 142 return 143 previous_pending_ids = pending_ids 144 145 yield from pending 146 147 def status(self, instance_id: str) -> InstanceStatusResponse: 148 """Get instance status and attack surface.""" 149 resp = self._http.request("GET", f"/instances/{instance_id}/status") 150 _raise_for_status(resp) 151 return InstanceStatusResponse.model_validate(resp.json()) 152 153 def stop(self, instance_id: str) -> None: 154 """Tear down ``instance_id``. Idempotent: a 404 from the server 155 means the instance is already gone (auto-stopped after solve, 156 TTL expiry, admin teardown) and is treated as success.""" 157 resp = self._http.request("DELETE", f"/instances/{instance_id}") 158 if resp.status_code != 404: 159 _raise_for_status(resp) 160 161 def renew(self, instance_id: str, ttl: int | None = None) -> RenewResponse: 162 """Extend the TTL of a running instance. 163 164 ``ttl=None`` (the default) delegates to the platform's 165 ``default_instance_ttl_s`` setting. Explicit values are 166 clamped to ``max_instance_ttl_s``. 167 """ 168 params: dict[str, Any] = {} 169 if ttl is not None: 170 params["ttl"] = ttl 171 resp = self._http.request("POST", f"/instances/{instance_id}/renew", params=params) 172 _raise_for_status(resp) 173 return RenewResponse.model_validate(resp.json()) 174 175 def list( 176 self, 177 challenge_id: str = "", 178 status: str = "", 179 q: str = "", 180 offset: int = 0, 181 limit: int = 50, 182 ) -> builtins.list[InstanceInfo]: 183 """List all running instances.""" 184 params: dict[str, Any] = {"offset": offset, "limit": limit} 185 if challenge_id: 186 params["challenge_id"] = challenge_id 187 if status: 188 params["status"] = status 189 if q: 190 params["q"] = q 191 resp = self._http.request("GET", "/instances", params=params) 192 _raise_for_status(resp) 193 return _extract_items(resp.json(), InstanceInfo) 194 195 def attachments(self, instance_id: str) -> AttachmentList: 196 """List per-instance (post-launch) attachments. 197 198 For challenges with ``attachments/<name>.tpl`` Jinja templates 199 the names + sizes here are the *rendered* per-team-unique 200 siblings; for ones with only static attachments the list is 201 the same as :meth:`ChallengesResource.attachments`. 202 """ 203 resp = self._http.request("GET", f"/instances/{instance_id}/attachments") 204 _raise_for_status(resp) 205 return AttachmentList.model_validate(resp.json()) 206 207 def download_attachment(self, instance_id: str, filename: str) -> bytes: 208 """Download one per-instance attachment as raw bytes. 209 210 For challenges with per-team Jinja templates this returns the 211 team-specific render; for static attachments it's the same 212 bytes as :meth:`ChallengesResource.download_attachment` would yield. 213 """ 214 resp = self._http.request("GET", f"/instances/{instance_id}/attachments/{filename}") 215 _raise_for_status(resp) 216 return resp.content 217 218 def openvpn_config(self, instance_id: str) -> bytes: 219 """Download the per-instance OpenVPN client config as raw bytes. 220 221 Only meaningful for challenges that declare 222 ``network_topology: engagement`` in metadata.yaml. The body is 223 the rendered ``.ovpn`` file the player or automated agent 224 passes to ``openvpn --config …`` to land on the challenge's 225 DMZ docker network. The platform serves a 404 for simple-mode 226 instances. 227 228 Raises a ``CTFyError`` (404) when the challenge isn't an 229 engagement-mode one, when the instance hasn't yet reached the 230 running state, or when the node's openvpn container hasn't 231 finished its PKI bootstrap. 232 """ 233 resp = self._http.request("GET", f"/instances/{instance_id}/openvpn-config") 234 _raise_for_status(resp) 235 return resp.content 236 237 def traffic(self, instance_id: str, *, competition_id: str = "") -> dict[str, Any]: 238 """Parsed mitmproxy traffic for a running instance. The flow 239 file lives on the worker node; the platform proxies the fetch. 240 241 Returns an empty dict when no capture exists (sidecar disabled, 242 node offline, instance never reached running) so callers can 243 persist conditionally without try/except. 244 """ 245 params = {"competition_id": competition_id} if competition_id else {} 246 resp = self._http.request("GET", f"/instances/{instance_id}/traffic", params=params) 247 _raise_for_status(resp) 248 data = resp.json() 249 return data if isinstance(data, dict) else {}
Start / inspect / tear down challenge instances.
38 def start( 39 self, 40 challenge_id: str, 41 ttl: int | None = None, 42 *, 43 competition_id: str = "", 44 timeout: int = 300, 45 poll_interval: float = 2.0, 46 proxy_output_dir: str | None = None, 47 ) -> InstanceReadyResult: 48 """Start instance and wait until ready. 49 50 Returns InstanceReadyResult with attack surface + sandbox network info. 51 The .surface attribute provides backward compatibility. 52 53 Args: 54 challenge_id: Challenge ID (e.g., "XBOW-047"). 55 ttl: Per-instance TTL override in seconds. ``None`` (the 56 default) delegates to the platform's 57 ``default_instance_ttl_s`` setting (admin-tunable, 58 currently 24h). Explicit values are clamped to the 59 platform's ``max_instance_ttl_s``. 60 competition_id: Which competition to spin the instance up 61 under. Required when the calling user is on more than 62 one team — agents tied to a single comp can leave it 63 empty and the server picks the unambiguous team. 64 timeout: Max seconds to wait for ready. 65 poll_interval: Seconds between status polls. 66 proxy_output_dir: Host path to store proxy traffic captures. 67 """ 68 body: dict[str, Any] = { 69 "challenge_id": challenge_id, 70 "competition_id": competition_id, 71 } 72 if ttl is not None: 73 body["ttl"] = ttl 74 if proxy_output_dir: 75 body["proxy_output_dir"] = proxy_output_dir 76 resp = self._http.request("POST", "/instances", json=body) 77 _raise_for_status(resp) 78 start = StartResponse.model_validate(resp.json()) 79 80 return _poll_instance_ready( 81 self._http._client, start.id, timeout=timeout, poll_interval=poll_interval 82 )
Start instance and wait until ready.
Returns InstanceReadyResult with attack surface + sandbox network info. The .surface attribute provides backward compatibility.
Arguments:
- challenge_id: Challenge ID (e.g., "XBOW-047").
- ttl: Per-instance TTL override in seconds.
None(the default) delegates to the platform'sdefault_instance_ttl_ssetting (admin-tunable, currently 24h). Explicit values are clamped to the platform'smax_instance_ttl_s. - competition_id: Which competition to spin the instance up under. Required when the calling user is on more than one team — agents tied to a single comp can leave it empty and the server picks the unambiguous team.
- timeout: Max seconds to wait for ready.
- poll_interval: Seconds between status polls.
- proxy_output_dir: Host path to store proxy traffic captures.
84 def get(self, instance_id: str) -> InstanceInfo: 85 """Get the full ``InstanceInfo`` for one running instance. 86 87 Carries the per-instance ``questions`` list with each question's 88 ``unlocked`` / ``answered_correctly`` / ``attempts_remaining`` 89 state — the surface multi-milestone solvers iterate on. The 90 prompt is empty for still-locked questions so the route hint 91 doesn't leak before the prerequisite is solved. 92 93 See :meth:`status` for the lighter status-only payload (no 94 questions, no spec metadata) used by polling loops. 95 """ 96 resp = self._http.request("GET", f"/instances/{instance_id}") 97 _raise_for_status(resp) 98 return InstanceInfo.model_validate(resp.json())
Get the full InstanceInfo for one running instance.
Carries the per-instance questions list with each question's
unlocked / answered_correctly / attempts_remaining
state — the surface multi-milestone solvers iterate on. The
prompt is empty for still-locked questions so the route hint
doesn't leak before the prerequisite is solved.
See status() for the lighter status-only payload (no
questions, no spec metadata) used by polling loops.
100 def iter_pending_questions(self, instance_id: str) -> Iterator[InstanceQuestionInfo]: 101 """Yield each currently-pending question on a running instance. 102 103 "Pending" = unlocked (the ``requires:`` chain is satisfied) 104 and not yet correctly answered by the calling team. After 105 every batch is exhausted, re-fetches ``InstanceInfo`` so 106 newly-unlocked questions (whose prerequisite was just solved 107 by the consumer) flow through on the next iteration. 108 109 Terminates on either: 110 111 * no pending questions left — the natural "challenge fully 112 solved" exit; 113 * **no progress** — the same set of pending question ids 114 comes back twice in a row, meaning the consumer's solver 115 isn't capturing anything. Stops silently rather than 116 infinite-looping; inspect the caller's last :meth:`get` to 117 see what's still stuck. 118 119 This is the ergonomic helper for the common multi-milestone 120 loop:: 121 122 for q in client.instances.iter_pending_questions(instance_id): 123 ans = my_solver(q.prompt) 124 client.submissions.submit(instance_id, ans, question_id=q.id) 125 126 Args: 127 instance_id: The instance to walk. 128 """ 129 previous_pending_ids: frozenset[str] | None = None 130 while True: 131 info = self.get(instance_id) 132 pending = [q for q in info.questions if q.unlocked and not q.answered_correctly] 133 if not pending: 134 return 135 136 pending_ids = frozenset(q.id for q in pending) 137 if pending_ids == previous_pending_ids: 138 # Solver made no progress against the last batch — stop 139 # rather than spin. The consumer can re-call this method 140 # later (after fixing their solver) to pick up where 141 # they left off. 142 return 143 previous_pending_ids = pending_ids 144 145 yield from pending
Yield each currently-pending question on a running instance.
"Pending" = unlocked (the requires: chain is satisfied)
and not yet correctly answered by the calling team. After
every batch is exhausted, re-fetches InstanceInfo so
newly-unlocked questions (whose prerequisite was just solved
by the consumer) flow through on the next iteration.
Terminates on either:
- no pending questions left — the natural "challenge fully solved" exit;
- no progress — the same set of pending question ids
comes back twice in a row, meaning the consumer's solver
isn't capturing anything. Stops silently rather than
infinite-looping; inspect the caller's last
get()to see what's still stuck.
This is the ergonomic helper for the common multi-milestone loop::
for q in client.instances.iter_pending_questions(instance_id):
ans = my_solver(q.prompt)
client.submissions.submit(instance_id, ans, question_id=q.id)
Arguments:
- instance_id: The instance to walk.
147 def status(self, instance_id: str) -> InstanceStatusResponse: 148 """Get instance status and attack surface.""" 149 resp = self._http.request("GET", f"/instances/{instance_id}/status") 150 _raise_for_status(resp) 151 return InstanceStatusResponse.model_validate(resp.json())
Get instance status and attack surface.
153 def stop(self, instance_id: str) -> None: 154 """Tear down ``instance_id``. Idempotent: a 404 from the server 155 means the instance is already gone (auto-stopped after solve, 156 TTL expiry, admin teardown) and is treated as success.""" 157 resp = self._http.request("DELETE", f"/instances/{instance_id}") 158 if resp.status_code != 404: 159 _raise_for_status(resp)
Tear down instance_id. Idempotent: a 404 from the server
means the instance is already gone (auto-stopped after solve,
TTL expiry, admin teardown) and is treated as success.
161 def renew(self, instance_id: str, ttl: int | None = None) -> RenewResponse: 162 """Extend the TTL of a running instance. 163 164 ``ttl=None`` (the default) delegates to the platform's 165 ``default_instance_ttl_s`` setting. Explicit values are 166 clamped to ``max_instance_ttl_s``. 167 """ 168 params: dict[str, Any] = {} 169 if ttl is not None: 170 params["ttl"] = ttl 171 resp = self._http.request("POST", f"/instances/{instance_id}/renew", params=params) 172 _raise_for_status(resp) 173 return RenewResponse.model_validate(resp.json())
Extend the TTL of a running instance.
ttl=None (the default) delegates to the platform's
default_instance_ttl_s setting. Explicit values are
clamped to max_instance_ttl_s.
175 def list( 176 self, 177 challenge_id: str = "", 178 status: str = "", 179 q: str = "", 180 offset: int = 0, 181 limit: int = 50, 182 ) -> builtins.list[InstanceInfo]: 183 """List all running instances.""" 184 params: dict[str, Any] = {"offset": offset, "limit": limit} 185 if challenge_id: 186 params["challenge_id"] = challenge_id 187 if status: 188 params["status"] = status 189 if q: 190 params["q"] = q 191 resp = self._http.request("GET", "/instances", params=params) 192 _raise_for_status(resp) 193 return _extract_items(resp.json(), InstanceInfo)
List all running instances.
195 def attachments(self, instance_id: str) -> AttachmentList: 196 """List per-instance (post-launch) attachments. 197 198 For challenges with ``attachments/<name>.tpl`` Jinja templates 199 the names + sizes here are the *rendered* per-team-unique 200 siblings; for ones with only static attachments the list is 201 the same as :meth:`ChallengesResource.attachments`. 202 """ 203 resp = self._http.request("GET", f"/instances/{instance_id}/attachments") 204 _raise_for_status(resp) 205 return AttachmentList.model_validate(resp.json())
List per-instance (post-launch) attachments.
For challenges with attachments/<name>.tpl Jinja templates
the names + sizes here are the rendered per-team-unique
siblings; for ones with only static attachments the list is
the same as ChallengesResource.attachments().
207 def download_attachment(self, instance_id: str, filename: str) -> bytes: 208 """Download one per-instance attachment as raw bytes. 209 210 For challenges with per-team Jinja templates this returns the 211 team-specific render; for static attachments it's the same 212 bytes as :meth:`ChallengesResource.download_attachment` would yield. 213 """ 214 resp = self._http.request("GET", f"/instances/{instance_id}/attachments/{filename}") 215 _raise_for_status(resp) 216 return resp.content
Download one per-instance attachment as raw bytes.
For challenges with per-team Jinja templates this returns the
team-specific render; for static attachments it's the same
bytes as ChallengesResource.download_attachment() would yield.
218 def openvpn_config(self, instance_id: str) -> bytes: 219 """Download the per-instance OpenVPN client config as raw bytes. 220 221 Only meaningful for challenges that declare 222 ``network_topology: engagement`` in metadata.yaml. The body is 223 the rendered ``.ovpn`` file the player or automated agent 224 passes to ``openvpn --config …`` to land on the challenge's 225 DMZ docker network. The platform serves a 404 for simple-mode 226 instances. 227 228 Raises a ``CTFyError`` (404) when the challenge isn't an 229 engagement-mode one, when the instance hasn't yet reached the 230 running state, or when the node's openvpn container hasn't 231 finished its PKI bootstrap. 232 """ 233 resp = self._http.request("GET", f"/instances/{instance_id}/openvpn-config") 234 _raise_for_status(resp) 235 return resp.content
Download the per-instance OpenVPN client config as raw bytes.
Only meaningful for challenges that declare
network_topology: engagement in metadata.yaml. The body is
the rendered .ovpn file the player or automated agent
passes to openvpn --config … to land on the challenge's
DMZ docker network. The platform serves a 404 for simple-mode
instances.
Raises a CTFyError (404) when the challenge isn't an
engagement-mode one, when the instance hasn't yet reached the
running state, or when the node's openvpn container hasn't
finished its PKI bootstrap.
237 def traffic(self, instance_id: str, *, competition_id: str = "") -> dict[str, Any]: 238 """Parsed mitmproxy traffic for a running instance. The flow 239 file lives on the worker node; the platform proxies the fetch. 240 241 Returns an empty dict when no capture exists (sidecar disabled, 242 node offline, instance never reached running) so callers can 243 persist conditionally without try/except. 244 """ 245 params = {"competition_id": competition_id} if competition_id else {} 246 resp = self._http.request("GET", f"/instances/{instance_id}/traffic", params=params) 247 _raise_for_status(resp) 248 data = resp.json() 249 return data if isinstance(data, dict) else {}
Parsed mitmproxy traffic for a running instance. The flow file lives on the worker node; the platform proxies the fetch.
Returns an empty dict when no capture exists (sidecar disabled, node offline, instance never reached running) so callers can persist conditionally without try/except.