ctfy.sdk
Python SDK for the ctfy platform.
Drives the platform's /api/v1/* REST API from Python with typed Pydantic
returns and auto-retry on transient errors. Two clients, split by audience —
the same line the ctfy / ctfy-admin CLI binaries draw:
~ctfy.sdk.client.PlatformClient— the player client. This is what agents and players want. Methods are grouped into resource namespaces (client.teams,client.instances, …) plus a per-competition handle viaclient.competition(id).~ctfy.sdk.admin.AdminClient— the operator client (/api/v1/admin/*and fleet management). Reached viaclient.adminorAdminClient.connect(). Kept separate so the player reference never surfaces admin endpoints they can't call.
Install
The full ctfy package pulls in the server too. For an agent /
harness that only needs the client, use the client extra::
pip install "ctfy[client]"
Authenticate
Sign in to the platform in the browser, then go to Settings → API
tokens and mint a fine-grained token (prefix pf_). These tokens
launch instances and submit answers on the team's behalf but cannot
mint more tokens, re-link OAuth providers, or change team
membership — so a leaked token never costs you the account.
Export it as CTFY_TOKEN for the CLI / MCP server, or pass it
directly to PlatformClient.
Quick start
End-to-end: discover a competition, register, list its challenges, launch one, read the attack surface, submit a captured flag::
from ctfy.sdk import PlatformClient
client = PlatformClient("https://ctfy.authu.online", token="pf_xxx")
# 1) Find a running competition
comps = client.competitions.list(phase="running")
comp = comps[0]
print(comp.id, comp.title)
# 2) Scope to the competition, then register Solo (skip if already on a
# team: inspect ``client.me.get().competition_teams`` first)
comp_api = client.competition(comp.id)
team = comp_api.register(mode="solo")
print(team.id)
# 3) List the challenges scoped to this competition (curated order)
challenges = comp_api.challenges()
for ch in challenges:
print(f" {ch.id} [{ch.category.value}/{ch.difficulty}] {ch.name}")
# 4) Launch the first challenge — blocks until ready
ready = client.instances.start(
challenges[0].id, competition_id=comp.id
)
print(f"instance {ready.id}")
# 5) Read the attack surface (services + credentials, plus VPN for
# engagement-mode challenges)
for svc in ready.surface.services:
print(f" [{svc.service_type.value}] {svc.url}")
if svc.credentials:
print(f" {svc.credentials.username}/{svc.credentials.password}")
if ready.surface.vpn:
print(f" VPN: {ready.surface.vpn.host}:{ready.surface.vpn.port}")
# 6) Submit a captured flag (oracle-probe via
# ``client.submissions.verify()`` first if you don't want to burn
# an audit row)
result = client.submissions.submit(ready.id, "FLAG{your_flag_here}")
print(f"correct={result.correct} solved={result.challenge_fully_solved}")
# 7) Tear down (or let TTL expire — default 24h, admin-tunable)
client.instances.stop(ready.id)
A runnable, interactive copy of this script lives at
examples/quickstart.py.
Player surface
Global / account namespaces on ~ctfy.sdk.client.PlatformClient:
client.teams/client.users— public team + user profile reads.client.me— the caller's profile, inbox, account export/delete, achievements, solve + milestone progress.client.auth— password register/login, OAuth identities, API token CRUD, sign-in discovery.client.competitions— discover competitions (list/get).client.challenges— global catalog, attachments, facets, feedback chips.client.instances—start(blocks until ready),status,stop,renew,list(by instance id; the competition is inferred).client.submissions—submit(graded),verify(oracle probe, no record),list, and QA challenges.client.scoreboard— global standings, per-challenge stats, snapshots.client.achievements/client.activities/client.nodes— public badge catalog, activity log, worker-node list.
Per-competition scope — client.competition(id) returns a
~ctfy.sdk.competition.Competition handle for the operations that are
inherently competition-scoped, so competition_id is named once:
comp.register(mode="solo" | "create" | "join", …)— get on a team.comp.team— your team:rename/leave/kick, pluscomp.team.invites(codes + email invites) andcomp.team.requests(approve / reject join requests).comp.challenges()/comp.search_teams()and the standings (comp.scoreboard()/score_history/score_distribution/solve_matrix/challenge_breakdown).
Top-level convenience methods stay flat: client.health(),
client.get_meta(), client.cluster_info(), client.events(),
client.check_server_compatibility().
Admin surface
client.admin (an ~ctfy.sdk.admin.AdminClient) carries the
operator endpoints, grouped the same way: admin.users,
admin.competitions, admin.announcements, admin.achievements,
admin.challenges, admin.instances, admin.records (instance
forensics), admin.nodes, admin.observability, admin.settings,
admin.competition_admins. Operator tooling without a player client can use
AdminClient.connect(url, token=...).
Realtime
~~~~
PlatformClient.events() yields an auto-reconnecting
~ctfy.sdk.events.EventStream over the SSE endpoint —
solve broadcasts, instance state changes, scoreboard refreshes.
See also
PlatformClient— the player client; every namespace method has full docstrings.AdminClient— the operator client.ctfy.server.models— the typed response shapes (ChallengeInfo,InstanceInfo,SubmissionResponse, …) that SDK methods return. Re-using the server's Pydantic models means the wire shape can never drift between the two sides.<platform>/docs— auto-generated FastAPI Swagger UI for the raw REST API (language-agnostic).<platform>/api/v1/mcp/— streamable-HTTP MCP endpoint with the same operations as MCP tools, if the agent speaks MCP instead of Python.~ctfy.sdk.node_client.NodeClient— separate client for worker-node ↔ platform RPCs; not what agent harnesses want.
1"""Python SDK for the ctfy platform. 2 3Drives the platform's ``/api/v1/*`` REST API from Python with typed Pydantic 4returns and auto-retry on transient errors. Two clients, split by audience — 5the same line the ``ctfy`` / ``ctfy-admin`` CLI binaries draw: 6 7- :class:`~ctfy.sdk.client.PlatformClient` — the **player** client. This is 8 what agents and players want. Methods are grouped into resource namespaces 9 (``client.teams``, ``client.instances``, …) plus a per-competition handle 10 via ``client.competition(id)``. 11- :class:`~ctfy.sdk.admin.AdminClient` — the **operator** client 12 (``/api/v1/admin/*`` and fleet management). Reached via ``client.admin`` or 13 :meth:`AdminClient.connect`. Kept separate so the player reference never 14 surfaces admin endpoints they can't call. 15 16Install 17------- 18 19The full ``ctfy`` package pulls in the server too. For an agent / 20harness that only needs the client, use the ``client`` extra:: 21 22 pip install "ctfy[client]" 23 24Authenticate 25------------ 26 27Sign in to the platform in the browser, then go to **Settings → API 28tokens** and mint a fine-grained token (prefix ``pf_``). These tokens 29launch instances and submit answers on the team's behalf but cannot 30mint more tokens, re-link OAuth providers, or change team 31membership — so a leaked token never costs you the account. 32 33Export it as ``CTFY_TOKEN`` for the CLI / MCP server, or pass it 34directly to :class:`PlatformClient`. 35 36Quick start 37----------- 38 39End-to-end: discover a competition, register, list its challenges, 40launch one, read the attack surface, submit a captured flag:: 41 42 from ctfy.sdk import PlatformClient 43 44 client = PlatformClient("https://ctfy.authu.online", token="pf_xxx") 45 46 # 1) Find a running competition 47 comps = client.competitions.list(phase="running") 48 comp = comps[0] 49 print(comp.id, comp.title) 50 51 # 2) Scope to the competition, then register Solo (skip if already on a 52 # team: inspect ``client.me.get().competition_teams`` first) 53 comp_api = client.competition(comp.id) 54 team = comp_api.register(mode="solo") 55 print(team.id) 56 57 # 3) List the challenges scoped to this competition (curated order) 58 challenges = comp_api.challenges() 59 for ch in challenges: 60 print(f" {ch.id} [{ch.category.value}/{ch.difficulty}] {ch.name}") 61 62 # 4) Launch the first challenge — blocks until ready 63 ready = client.instances.start( 64 challenges[0].id, competition_id=comp.id 65 ) 66 print(f"instance {ready.id}") 67 68 # 5) Read the attack surface (services + credentials, plus VPN for 69 # engagement-mode challenges) 70 for svc in ready.surface.services: 71 print(f" [{svc.service_type.value}] {svc.url}") 72 if svc.credentials: 73 print(f" {svc.credentials.username}/{svc.credentials.password}") 74 if ready.surface.vpn: 75 print(f" VPN: {ready.surface.vpn.host}:{ready.surface.vpn.port}") 76 77 # 6) Submit a captured flag (oracle-probe via 78 # ``client.submissions.verify()`` first if you don't want to burn 79 # an audit row) 80 result = client.submissions.submit(ready.id, "FLAG{your_flag_here}") 81 print(f"correct={result.correct} solved={result.challenge_fully_solved}") 82 83 # 7) Tear down (or let TTL expire — default 24h, admin-tunable) 84 client.instances.stop(ready.id) 85 86A runnable, interactive copy of this script lives at 87``examples/quickstart.py``. 88 89Player surface 90-------------- 91 92Global / account namespaces on :class:`~ctfy.sdk.client.PlatformClient`: 93 94- ``client.teams`` / ``client.users`` — public team + user profile reads. 95- ``client.me`` — the caller's profile, inbox, account export/delete, 96 achievements, solve + milestone progress. 97- ``client.auth`` — password register/login, OAuth identities, API token 98 CRUD, sign-in discovery. 99- ``client.competitions`` — discover competitions (``list`` / ``get``). 100- ``client.challenges`` — global catalog, attachments, facets, feedback chips. 101- ``client.instances`` — ``start`` (blocks until ready), ``status``, ``stop``, 102 ``renew``, ``list`` (by instance id; the competition is inferred). 103- ``client.submissions`` — ``submit`` (graded), ``verify`` (oracle probe, 104 no record), ``list``, and QA challenges. 105- ``client.scoreboard`` — global standings, per-challenge stats, snapshots. 106- ``client.achievements`` / ``client.activities`` / ``client.nodes`` — public 107 badge catalog, activity log, worker-node list. 108 109Per-competition scope — ``client.competition(id)`` returns a 110:class:`~ctfy.sdk.competition.Competition` handle for the operations that are 111*inherently* competition-scoped, so ``competition_id`` is named once: 112 113- ``comp.register(mode="solo" | "create" | "join", …)`` — get on a team. 114- ``comp.team`` — your team: ``rename`` / ``leave`` / ``kick``, plus 115 ``comp.team.invites`` (codes + email invites) and ``comp.team.requests`` 116 (approve / reject join requests). 117- ``comp.challenges()`` / ``comp.search_teams()`` and the standings 118 (``comp.scoreboard()`` / ``score_history`` / ``score_distribution`` / 119 ``solve_matrix`` / ``challenge_breakdown``). 120 121Top-level convenience methods stay flat: ``client.health()``, 122``client.get_meta()``, ``client.cluster_info()``, ``client.events()``, 123``client.check_server_compatibility()``. 124 125Admin surface 126------------- 127 128``client.admin`` (an :class:`~ctfy.sdk.admin.AdminClient`) carries the 129operator endpoints, grouped the same way: ``admin.users``, 130``admin.competitions``, ``admin.announcements``, ``admin.achievements``, 131``admin.challenges``, ``admin.instances``, ``admin.records`` (instance 132forensics), ``admin.nodes``, ``admin.observability``, ``admin.settings``, 133``admin.competition_admins``. Operator tooling without a player client can use 134``AdminClient.connect(url, token=...)``. 135 136Realtime 137~~~~~~~~ 138 139:meth:`PlatformClient.events` yields an auto-reconnecting 140:class:`~ctfy.sdk.events.EventStream` over the SSE endpoint — 141solve broadcasts, instance state changes, scoreboard refreshes. 142 143See also 144-------- 145 146- :class:`PlatformClient` — the player client; every namespace method has 147 full docstrings. 148- :class:`AdminClient` — the operator client. 149- ``ctfy.server.models`` — the typed response shapes 150 (``ChallengeInfo``, ``InstanceInfo``, ``SubmissionResponse``, …) 151 that SDK methods return. Re-using the server's Pydantic models means 152 the wire shape can never drift between the two sides. 153- ``<platform>/docs`` — auto-generated FastAPI Swagger UI for the 154 raw REST API (language-agnostic). 155- ``<platform>/api/v1/mcp/`` — streamable-HTTP MCP endpoint with 156 the same operations as MCP tools, if the agent speaks MCP 157 instead of Python. 158- :class:`~ctfy.sdk.node_client.NodeClient` — separate client for 159 worker-node ↔ platform RPCs; not what agent harnesses want. 160""" 161 162from ctfy.sdk.admin import AdminClient 163from ctfy.sdk.client import PlatformClient 164from ctfy.sdk.node_client import NodeClient 165 166__all__ = ["AdminClient", "NodeClient", "PlatformClient"]
55class AdminClient: 56 """Admin / operator surface, grouped into resource namespaces. 57 58 Namespaces: :attr:`users`, :attr:`competitions`, :attr:`announcements`, 59 :attr:`achievements`, :attr:`challenges`, :attr:`instances`, 60 :attr:`records`, :attr:`nodes`, :attr:`observability`, :attr:`settings`, 61 :attr:`competition_admins`. 62 """ 63 64 def __init__(self, http: BaseHttpClient) -> None: 65 #: Shared transport (the parent PlatformClient when reached via 66 #: ``client.admin``, or an owned client when built via ``connect``). 67 self._http = http 68 69 @classmethod 70 def connect( 71 cls, 72 server_url: str, 73 token: str = "", 74 *, 75 max_retries: int = 3, 76 timeout: int = DEFAULT_CLIENT_TIMEOUT, 77 ) -> AdminClient: 78 """Build an ``AdminClient`` that owns its own transport. 79 80 For operator tooling that doesn't already hold a 81 :class:`PlatformClient`. The returned client is a context manager 82 that closes its transport on exit. 83 """ 84 base = server_url.rstrip("/") 85 http = BaseHttpClient(f"{base}/api/v1", token, timeout=timeout, max_retries=max_retries) 86 return cls(http) 87 88 @cached_property 89 def users(self) -> AdminUsersResource: 90 return AdminUsersResource(self._http) 91 92 @cached_property 93 def competitions(self) -> AdminCompetitionsResource: 94 return AdminCompetitionsResource(self._http) 95 96 @cached_property 97 def announcements(self) -> AdminAnnouncementsResource: 98 return AdminAnnouncementsResource(self._http) 99 100 @cached_property 101 def achievements(self) -> AdminAchievementsResource: 102 return AdminAchievementsResource(self._http) 103 104 @cached_property 105 def challenges(self) -> AdminChallengesResource: 106 return AdminChallengesResource(self._http) 107 108 @cached_property 109 def instances(self) -> AdminInstancesResource: 110 return AdminInstancesResource(self._http) 111 112 @cached_property 113 def records(self) -> AdminRecordsResource: 114 return AdminRecordsResource(self._http) 115 116 @cached_property 117 def nodes(self) -> AdminNodesResource: 118 return AdminNodesResource(self._http) 119 120 @cached_property 121 def observability(self) -> AdminObservabilityResource: 122 return AdminObservabilityResource(self._http) 123 124 @cached_property 125 def settings(self) -> AdminSettingsResource: 126 return AdminSettingsResource(self._http) 127 128 @cached_property 129 def competition_admins(self) -> AdminCompetitionAdminsResource: 130 return AdminCompetitionAdminsResource(self._http) 131 132 def close(self) -> None: 133 """Close the underlying transport. 134 135 Only call this on a standalone client built via :meth:`connect`; 136 when reached via ``PlatformClient.admin`` the transport is shared 137 with — and closed by — the parent player client. 138 """ 139 self._http.close() 140 141 def __enter__(self) -> Self: 142 return self 143 144 def __exit__( 145 self, 146 exc_type: type[BaseException] | None, 147 exc: BaseException | None, 148 tb: TracebackType | None, 149 ) -> None: 150 self.close()
Admin / operator surface, grouped into resource namespaces.
Namespaces: users, competitions, announcements,
achievements, challenges, instances,
records, nodes, observability, settings,
competition_admins.
69 @classmethod 70 def connect( 71 cls, 72 server_url: str, 73 token: str = "", 74 *, 75 max_retries: int = 3, 76 timeout: int = DEFAULT_CLIENT_TIMEOUT, 77 ) -> AdminClient: 78 """Build an ``AdminClient`` that owns its own transport. 79 80 For operator tooling that doesn't already hold a 81 :class:`PlatformClient`. The returned client is a context manager 82 that closes its transport on exit. 83 """ 84 base = server_url.rstrip("/") 85 http = BaseHttpClient(f"{base}/api/v1", token, timeout=timeout, max_retries=max_retries) 86 return cls(http)
Build an AdminClient that owns its own transport.
For operator tooling that doesn't already hold a
PlatformClient. The returned client is a context manager
that closes its transport on exit.
132 def close(self) -> None: 133 """Close the underlying transport. 134 135 Only call this on a standalone client built via :meth:`connect`; 136 when reached via ``PlatformClient.admin`` the transport is shared 137 with — and closed by — the parent player client. 138 """ 139 self._http.close()
Close the underlying transport.
Only call this on a standalone client built via connect();
when reached via PlatformClient.admin the transport is shared
with — and closed by — the parent player client.
48class NodeClient(BaseHttpClient): 49 """Thin sync HTTP client; one instance per node URL.""" 50 51 def __init__( 52 self, 53 node_url: str, 54 token: str, 55 *, 56 timeout: int = DEFAULT_CLIENT_TIMEOUT, 57 ) -> None: 58 super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout) 59 60 # -- lifecycle ---------------------------------------------------------- 61 62 def start_instance( 63 self, 64 *, 65 challenge_id: str, 66 instance_id: str, 67 ttl: int, 68 answers: dict[str, str], 69 proxy_output_dir: str | None = None, 70 ) -> dict[str, Any]: 71 resp = self.request( 72 "POST", 73 "/instances", 74 json={ 75 "challenge_id": challenge_id, 76 "instance_id": instance_id, 77 "ttl": ttl, 78 "answers": answers, 79 "proxy_output_dir": proxy_output_dir, 80 }, 81 ) 82 _raise_for_node_status(resp) 83 body: dict[str, Any] = resp.json() 84 return body 85 86 def stop_instance(self, instance_id: str) -> None: 87 resp = self.request("DELETE", f"/instances/{instance_id}") 88 resp.raise_for_status() 89 90 def stop_all(self) -> None: 91 resp = self.request("POST", "/admin/stop-all") 92 resp.raise_for_status() 93 94 def rescan_challenges(self) -> dict[str, Any]: 95 """Tell the node to drop its spec cache and re-scan challenges_dir. 96 97 Returns ``{total, added, removed}`` so the platform can report 98 the per-node outcome of a cluster-wide rescan.""" 99 resp = self.request("POST", "/admin/rescan-challenges") 100 resp.raise_for_status() 101 body: dict[str, Any] = resp.json() 102 return body 103 104 # -- admin pre-build (image cache warming) ------------------------------ 105 106 def build_challenge(self, challenge_id: str) -> dict[str, Any]: 107 """Ask the node to pre-build images for *challenge_id*. 108 109 Fires-and-returns: the node persists ``status="building"`` and 110 spawns a daemon thread; the body returned here is that initial 111 state row. Polling :meth:`get_build_state` is how the platform 112 learns when it lands on ``built`` / ``failed``. 113 """ 114 resp = self.request("POST", f"/admin/challenges/{challenge_id}/build") 115 _raise_for_node_status(resp) 116 body: dict[str, Any] = resp.json() 117 return body 118 119 def build_all_challenges(self) -> dict[str, Any]: 120 """Queue background pre-build for every spec the node knows about. 121 122 Returns ``{queued, skipped_built, skipped_in_progress}`` so the 123 platform can report per-node what got picked up. Sequential 124 on the node side — no fan-out across the corpus. 125 """ 126 resp = self.request("POST", "/admin/challenges/build-all") 127 _raise_for_node_status(resp) 128 body: dict[str, Any] = resp.json() 129 return body 130 131 def get_build_state(self) -> dict[str, Any]: 132 """Fetch every per-challenge build-state row on this node. 133 134 Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``. 135 ``status`` is one of ``unbuilt`` / ``building`` / ``built`` / 136 ``failed``; ``unbuilt`` placeholders are synthesised for specs 137 the node has seen but never been asked to build. 138 """ 139 resp = self.request("GET", "/admin/challenges/build-state") 140 _raise_for_node_status(resp) 141 body: dict[str, Any] = resp.json() 142 return body 143 144 # -- status / health ---------------------------------------------------- 145 146 def get_status(self, instance_id: str) -> dict[str, Any]: 147 resp = self.request("GET", f"/instances/{instance_id}/status") 148 resp.raise_for_status() 149 body: dict[str, Any] = resp.json() 150 return body 151 152 def check_health(self, instance_id: str) -> bool: 153 resp = self.request("GET", f"/instances/{instance_id}/health") 154 resp.raise_for_status() 155 return bool(resp.json().get("is_healthy")) 156 157 def node_health(self) -> dict[str, Any]: 158 """Liveness + ``{running, capacity}`` for heartbeat.""" 159 resp = self.request("GET", "/health") 160 resp.raise_for_status() 161 body: dict[str, Any] = resp.json() 162 return body 163 164 # -- traffic / runtime -------------------------------------------------- 165 166 def get_traffic(self, instance_id: str) -> dict[str, Any]: 167 """Fetch mitmproxy flow data; file lives on the node's FS.""" 168 resp = self.request("GET", f"/instances/{instance_id}/traffic") 169 resp.raise_for_status() 170 body: dict[str, Any] = resp.json() 171 return body 172 173 def list_containers(self, instance_id: str) -> list[dict[str, Any]]: 174 """Enumerate every container in an instance's compose project. 175 176 Backs the admin-shell feature: the platform forwards a 177 ``GET /admin/instances/{id}/containers`` to the assigned node, 178 which returns one row per container (challenge services and 179 platform-injected sidecars alike). The WebSocket reverse-proxy 180 is opened separately and does not go through ``NodeClient``. 181 """ 182 resp = self.request("GET", f"/instances/{instance_id}/containers") 183 _raise_for_node_status(resp) 184 body: list[dict[str, Any]] = resp.json() 185 return body 186 187 def get_container_logs(self, instance_id: str) -> str: 188 """Fetch combined container stdout/stderr for archival.""" 189 resp = self.request("GET", f"/instances/{instance_id}/container-logs") 190 resp.raise_for_status() 191 return str(resp.json().get("logs") or "") 192 193 def get_pcap(self, instance_id: str) -> bytes: 194 """Fetch the raw tcpdump capture for *instance_id*. 195 196 Returns an empty ``bytes`` when no capture exists (sidecar 197 disabled, instance never reached the running state, etc.) so 198 callers can persist conditionally without try/except. 199 """ 200 resp = self.request("GET", f"/instances/{instance_id}/pcap") 201 if resp.status_code == 404: 202 return b"" 203 resp.raise_for_status() 204 return resp.content 205 206 def get_agent_runtime(self, instance_id: str) -> dict[str, Any]: 207 """Provider-agnostic runtime hints for attaching an agent (proxy URL, 208 CA PEM, optional Docker-specific names). Replaces the old 209 sandbox-network shape.""" 210 resp = self.request("GET", f"/instances/{instance_id}/agent-runtime") 211 resp.raise_for_status() 212 body: dict[str, Any] = resp.json() 213 return body 214 215 # -- per-instance rendered attachments --------------------------------- 216 217 def list_instance_attachments(self, instance_id: str) -> dict[str, Any]: 218 """List per-instance attachments rendered into the node's workdir. 219 220 Returns the raw JSON dict — caller projects through 221 :class:`AttachmentList` for typing. Used by the platform's 222 per-instance attachment endpoint to surface team-specific 223 rendered file listings.""" 224 resp = self.request("GET", f"/instances/{instance_id}/attachments") 225 resp.raise_for_status() 226 body: dict[str, Any] = resp.json() 227 return body 228 229 def get_openvpn_config(self, instance_id: str) -> bytes: 230 """Fetch the per-instance OpenVPN client config from the node. 231 232 The node reads ``/etc/openvpn/client.ovpn`` out of the instance's 233 ``openvpn`` service container (generated on first boot by the 234 ``ctfy/openvpn-base`` image's bootstrap). Returns the body 235 verbatim — the platform-side route adds the 236 ``Content-Disposition`` header before re-emitting to the player. 237 238 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 239 proxy route catches 404 and re-emits to the player, anything 240 else is treated as a 502. 241 """ 242 resp = self.request("GET", f"/instances/{instance_id}/openvpn-config") 243 resp.raise_for_status() 244 return resp.content 245 246 def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]: 247 """Download one per-instance attachment as ``(bytes, content_type)``. 248 249 Buffers the whole body — attachments are bounded (typical case 250 is a 10 KB binary or a small text file). For huge captures the 251 caller should hand the player a CDN URL instead. 252 253 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 254 proxy route catches 404 and re-emits to the player, anything 255 else is a 502.""" 256 resp = self.request( 257 "GET", 258 f"/instances/{instance_id}/attachments/{filename}", 259 ) 260 resp.raise_for_status() 261 return resp.content, resp.headers.get("content-type", "application/octet-stream")
Thin sync HTTP client; one instance per node URL.
62 def start_instance( 63 self, 64 *, 65 challenge_id: str, 66 instance_id: str, 67 ttl: int, 68 answers: dict[str, str], 69 proxy_output_dir: str | None = None, 70 ) -> dict[str, Any]: 71 resp = self.request( 72 "POST", 73 "/instances", 74 json={ 75 "challenge_id": challenge_id, 76 "instance_id": instance_id, 77 "ttl": ttl, 78 "answers": answers, 79 "proxy_output_dir": proxy_output_dir, 80 }, 81 ) 82 _raise_for_node_status(resp) 83 body: dict[str, Any] = resp.json() 84 return body
94 def rescan_challenges(self) -> dict[str, Any]: 95 """Tell the node to drop its spec cache and re-scan challenges_dir. 96 97 Returns ``{total, added, removed}`` so the platform can report 98 the per-node outcome of a cluster-wide rescan.""" 99 resp = self.request("POST", "/admin/rescan-challenges") 100 resp.raise_for_status() 101 body: dict[str, Any] = resp.json() 102 return body
Tell the node to drop its spec cache and re-scan challenges_dir.
Returns {total, added, removed} so the platform can report
the per-node outcome of a cluster-wide rescan.
106 def build_challenge(self, challenge_id: str) -> dict[str, Any]: 107 """Ask the node to pre-build images for *challenge_id*. 108 109 Fires-and-returns: the node persists ``status="building"`` and 110 spawns a daemon thread; the body returned here is that initial 111 state row. Polling :meth:`get_build_state` is how the platform 112 learns when it lands on ``built`` / ``failed``. 113 """ 114 resp = self.request("POST", f"/admin/challenges/{challenge_id}/build") 115 _raise_for_node_status(resp) 116 body: dict[str, Any] = resp.json() 117 return body
Ask the node to pre-build images for challenge_id.
Fires-and-returns: the node persists status="building" and
spawns a daemon thread; the body returned here is that initial
state row. Polling get_build_state() is how the platform
learns when it lands on built / failed.
119 def build_all_challenges(self) -> dict[str, Any]: 120 """Queue background pre-build for every spec the node knows about. 121 122 Returns ``{queued, skipped_built, skipped_in_progress}`` so the 123 platform can report per-node what got picked up. Sequential 124 on the node side — no fan-out across the corpus. 125 """ 126 resp = self.request("POST", "/admin/challenges/build-all") 127 _raise_for_node_status(resp) 128 body: dict[str, Any] = resp.json() 129 return body
Queue background pre-build for every spec the node knows about.
Returns {queued, skipped_built, skipped_in_progress} so the
platform can report per-node what got picked up. Sequential
on the node side — no fan-out across the corpus.
131 def get_build_state(self) -> dict[str, Any]: 132 """Fetch every per-challenge build-state row on this node. 133 134 Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``. 135 ``status`` is one of ``unbuilt`` / ``building`` / ``built`` / 136 ``failed``; ``unbuilt`` placeholders are synthesised for specs 137 the node has seen but never been asked to build. 138 """ 139 resp = self.request("GET", "/admin/challenges/build-state") 140 _raise_for_node_status(resp) 141 body: dict[str, Any] = resp.json() 142 return body
Fetch every per-challenge build-state row on this node.
Returns {"rows": [{challenge_id, status, built_at, error}, …]}.
status is one of unbuilt / building / built /
failed; unbuilt placeholders are synthesised for specs
the node has seen but never been asked to build.
157 def node_health(self) -> dict[str, Any]: 158 """Liveness + ``{running, capacity}`` for heartbeat.""" 159 resp = self.request("GET", "/health") 160 resp.raise_for_status() 161 body: dict[str, Any] = resp.json() 162 return body
Liveness + {running, capacity} for heartbeat.
166 def get_traffic(self, instance_id: str) -> dict[str, Any]: 167 """Fetch mitmproxy flow data; file lives on the node's FS.""" 168 resp = self.request("GET", f"/instances/{instance_id}/traffic") 169 resp.raise_for_status() 170 body: dict[str, Any] = resp.json() 171 return body
Fetch mitmproxy flow data; file lives on the node's FS.
173 def list_containers(self, instance_id: str) -> list[dict[str, Any]]: 174 """Enumerate every container in an instance's compose project. 175 176 Backs the admin-shell feature: the platform forwards a 177 ``GET /admin/instances/{id}/containers`` to the assigned node, 178 which returns one row per container (challenge services and 179 platform-injected sidecars alike). The WebSocket reverse-proxy 180 is opened separately and does not go through ``NodeClient``. 181 """ 182 resp = self.request("GET", f"/instances/{instance_id}/containers") 183 _raise_for_node_status(resp) 184 body: list[dict[str, Any]] = resp.json() 185 return body
Enumerate every container in an instance's compose project.
Backs the admin-shell feature: the platform forwards a
GET /admin/instances/{id}/containers to the assigned node,
which returns one row per container (challenge services and
platform-injected sidecars alike). The WebSocket reverse-proxy
is opened separately and does not go through NodeClient.
187 def get_container_logs(self, instance_id: str) -> str: 188 """Fetch combined container stdout/stderr for archival.""" 189 resp = self.request("GET", f"/instances/{instance_id}/container-logs") 190 resp.raise_for_status() 191 return str(resp.json().get("logs") or "")
Fetch combined container stdout/stderr for archival.
193 def get_pcap(self, instance_id: str) -> bytes: 194 """Fetch the raw tcpdump capture for *instance_id*. 195 196 Returns an empty ``bytes`` when no capture exists (sidecar 197 disabled, instance never reached the running state, etc.) so 198 callers can persist conditionally without try/except. 199 """ 200 resp = self.request("GET", f"/instances/{instance_id}/pcap") 201 if resp.status_code == 404: 202 return b"" 203 resp.raise_for_status() 204 return resp.content
Fetch the raw tcpdump capture for instance_id.
Returns an empty bytes when no capture exists (sidecar
disabled, instance never reached the running state, etc.) so
callers can persist conditionally without try/except.
206 def get_agent_runtime(self, instance_id: str) -> dict[str, Any]: 207 """Provider-agnostic runtime hints for attaching an agent (proxy URL, 208 CA PEM, optional Docker-specific names). Replaces the old 209 sandbox-network shape.""" 210 resp = self.request("GET", f"/instances/{instance_id}/agent-runtime") 211 resp.raise_for_status() 212 body: dict[str, Any] = resp.json() 213 return body
Provider-agnostic runtime hints for attaching an agent (proxy URL, CA PEM, optional Docker-specific names). Replaces the old sandbox-network shape.
217 def list_instance_attachments(self, instance_id: str) -> dict[str, Any]: 218 """List per-instance attachments rendered into the node's workdir. 219 220 Returns the raw JSON dict — caller projects through 221 :class:`AttachmentList` for typing. Used by the platform's 222 per-instance attachment endpoint to surface team-specific 223 rendered file listings.""" 224 resp = self.request("GET", f"/instances/{instance_id}/attachments") 225 resp.raise_for_status() 226 body: dict[str, Any] = resp.json() 227 return body
List per-instance attachments rendered into the node's workdir.
Returns the raw JSON dict — caller projects through
AttachmentList for typing. Used by the platform's
per-instance attachment endpoint to surface team-specific
rendered file listings.
229 def get_openvpn_config(self, instance_id: str) -> bytes: 230 """Fetch the per-instance OpenVPN client config from the node. 231 232 The node reads ``/etc/openvpn/client.ovpn`` out of the instance's 233 ``openvpn`` service container (generated on first boot by the 234 ``ctfy/openvpn-base`` image's bootstrap). Returns the body 235 verbatim — the platform-side route adds the 236 ``Content-Disposition`` header before re-emitting to the player. 237 238 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 239 proxy route catches 404 and re-emits to the player, anything 240 else is treated as a 502. 241 """ 242 resp = self.request("GET", f"/instances/{instance_id}/openvpn-config") 243 resp.raise_for_status() 244 return resp.content
Fetch the per-instance OpenVPN client config from the node.
The node reads /etc/openvpn/client.ovpn out of the instance's
openvpn service container (generated on first boot by the
ctfy/openvpn-base image's bootstrap). Returns the body
verbatim — the platform-side route adds the
Content-Disposition header before re-emitting to the player.
Raises httpx.HTTPStatusError on non-2xx — the platform's
proxy route catches 404 and re-emits to the player, anything
else is treated as a 502.
246 def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]: 247 """Download one per-instance attachment as ``(bytes, content_type)``. 248 249 Buffers the whole body — attachments are bounded (typical case 250 is a 10 KB binary or a small text file). For huge captures the 251 caller should hand the player a CDN URL instead. 252 253 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 254 proxy route catches 404 and re-emits to the player, anything 255 else is a 502.""" 256 resp = self.request( 257 "GET", 258 f"/instances/{instance_id}/attachments/{filename}", 259 ) 260 resp.raise_for_status() 261 return resp.content, resp.headers.get("content-type", "application/octet-stream")
Download one per-instance attachment as (bytes, content_type).
Buffers the whole body — attachments are bounded (typical case is a 10 KB binary or a small text file). For huge captures the caller should hand the player a CDN URL instead.
Raises httpx.HTTPStatusError on non-2xx — the platform's
proxy route catches 404 and re-emits to the player, anything
else is a 502.
77class PlatformClient(BaseHttpClient): 78 """Player-facing HTTP client for the ctfy platform. 79 80 Methods are grouped into resource namespaces (``client.teams.list()``, …) 81 plus a per-competition scope via ``client.competition(id)``. The admin 82 surface is a separate :class:`~ctfy.sdk.admin.AdminClient` reached via 83 :attr:`admin`. Used by the CLI, the SDK, the MCP server, and external callers. 84 """ 85 86 def __init__( 87 self, 88 server_url: str, 89 token: str = "", 90 max_retries: int = 3, 91 timeout: int = DEFAULT_CLIENT_TIMEOUT, 92 ): 93 self._url = server_url.rstrip("/") 94 super().__init__(f"{self._url}/api/v1", token, timeout=timeout, max_retries=max_retries) 95 96 # -- resource namespaces ------------------------------------------------ 97 98 @cached_property 99 def teams(self) -> TeamsResource: 100 """Public team discovery + profile reads.""" 101 return TeamsResource(self) 102 103 @cached_property 104 def users(self) -> UsersResource: 105 """Public per-user profile reads.""" 106 return UsersResource(self) 107 108 @cached_property 109 def me(self) -> MeResource: 110 """The calling user's profile, inbox, account, and progress.""" 111 return MeResource(self) 112 113 @cached_property 114 def auth(self) -> AuthResource: 115 """Password auth, OAuth identities, API tokens, sign-in discovery.""" 116 return AuthResource(self) 117 118 @cached_property 119 def achievements(self) -> AchievementsResource: 120 """Public badge catalog, recent-unlock feed, easter eggs.""" 121 return AchievementsResource(self) 122 123 @cached_property 124 def challenges(self) -> ChallengesResource: 125 """Global challenge catalog, attachments, facets, feedback chips.""" 126 return ChallengesResource(self) 127 128 @cached_property 129 def competitions(self) -> CompetitionsResource: 130 """Discover competitions (``list`` / ``get``). To act within one, use 131 :meth:`competition`.""" 132 return CompetitionsResource(self) 133 134 def competition(self, competition_id: str) -> Competition: 135 """Scope to one competition. The inherently competition-scoped 136 operations — register, your team (captain tooling + invites + 137 join-requests), the comp's challenges, team search, and standings — 138 hang off the returned :class:`~ctfy.sdk.competition.Competition` 139 handle, so ``competition_id`` is named once instead of on every call. 140 (Instance lifecycle and answer submission stay on :attr:`instances` / 141 :attr:`submissions`, keyed by ``instance_id``.)""" 142 return Competition(self, competition_id) 143 144 @cached_property 145 def instances(self) -> InstancesResource: 146 """Launch / control / inspect challenge instances by instance id 147 (the competition is inferred server-side).""" 148 return InstancesResource(self) 149 150 @cached_property 151 def submissions(self) -> SubmissionsResource: 152 """Submit / verify answers + list submissions (by instance id), plus 153 the instance-less QA challenges.""" 154 return SubmissionsResource(self) 155 156 @cached_property 157 def scoreboard(self) -> ScoreboardResource: 158 """Global standings, per-challenge stats, persisted snapshots.""" 159 return ScoreboardResource(self) 160 161 @cached_property 162 def activities(self) -> ActivitiesResource: 163 """Platform activity log + histogram.""" 164 return ActivitiesResource(self) 165 166 @cached_property 167 def nodes(self) -> NodesResource: 168 """Public worker-node list (operator node mgmt is on ``admin.nodes``).""" 169 return NodesResource(self) 170 171 @cached_property 172 def admin(self) -> AdminClient: 173 """Admin / operator surface (separate, role-gated server-side). 174 175 Shares this client's transport. See :class:`~ctfy.sdk.admin.AdminClient`. 176 """ 177 return AdminClient(self) 178 179 # -- top-level convenience: server status / realtime / version ---------- 180 181 def health(self) -> HealthResponse: 182 resp = self.request("GET", "/health") 183 _raise_for_status(resp) 184 return HealthResponse.model_validate(resp.json()) 185 186 def get_meta(self) -> MetaResponse: 187 """Server identity + challenge repo SHA + build version. 188 Admin tokens additionally see cluster capacity + team / solve 189 counts in the same payload.""" 190 resp = self.request("GET", "/meta") 191 _raise_for_status(resp) 192 return MetaResponse.model_validate(resp.json()) 193 194 def cluster_info(self) -> ClusterInfo: 195 """Aggregate worker-node capacity / utilisation (the public 196 capacity banner; no per-node detail).""" 197 resp = self.request("GET", "/cluster-info") 198 _raise_for_status(resp) 199 return ClusterInfo.model_validate(resp.json()) 200 201 def check_server_compatibility(self, *, health: HealthResponse | None = None) -> VersionCheck: 202 """Compare this client's ``ctfy`` version against the server's. 203 204 Uses the cheap unauthenticated ``/health`` probe (pass an 205 already-fetched :class:`HealthResponse` to avoid a second round 206 trip). Pure classification — never prints, never raises on a 207 mismatch, never mutates anything. The caller (CLI, MCP, harness) 208 decides what to do with :class:`~ctfy.core.version.VersionCheck` 209 (typically: print ``.message`` to stderr when not ``.quiet``). 210 211 A server too old to report its version yields 212 :attr:`Compatibility.UNKNOWN` (``health.version == ""``), which 213 is ``.quiet`` — so this stays silent against legacy servers 214 rather than crying wolf. 215 """ 216 h = health if health is not None else self.health() 217 return check_compatibility(package_version(), h.version) 218 219 def events(self, *, auto_reconnect: bool = True) -> EventStream: 220 """Open the platform SSE event stream. 221 222 Yields ``{"event": <name>, "data": <dict>}`` for each frame. 223 Filters: team-scoped events for every team the caller's user is 224 on (across every per-comp competition), plus all global events; 225 admin tokens additionally see admin-only frames. 226 227 Usage:: 228 229 with client.events() as stream: 230 for event in stream: 231 if event["event"] == "solve": 232 ... 233 234 Auto-reconnects on transient network errors with exponential 235 backoff (1s → 30s capped). Set ``auto_reconnect=False`` to make 236 the iterator raise instead. 237 """ 238 token = self._token 239 240 def _factory() -> httpx.Client: 241 # New client per session so a reconnect after a stale 242 # connection drops fresh sockets, not warmed-over ones. 243 return httpx.Client(base_url=self._base_url, timeout=None) 244 245 return EventStream( 246 client_factory=_factory, 247 path="/events", 248 token=token, 249 auto_reconnect=auto_reconnect, 250 )
Player-facing HTTP client for the ctfy platform.
Methods are grouped into resource namespaces (client.teams.list(), …)
plus a per-competition scope via client.competition(id). The admin
surface is a separate ~ctfy.sdk.admin.AdminClient reached via
admin. Used by the CLI, the SDK, the MCP server, and external callers.
98 @cached_property 99 def teams(self) -> TeamsResource: 100 """Public team discovery + profile reads.""" 101 return TeamsResource(self)
Public team discovery + profile reads.
103 @cached_property 104 def users(self) -> UsersResource: 105 """Public per-user profile reads.""" 106 return UsersResource(self)
Public per-user profile reads.
108 @cached_property 109 def me(self) -> MeResource: 110 """The calling user's profile, inbox, account, and progress.""" 111 return MeResource(self)
The calling user's profile, inbox, account, and progress.
113 @cached_property 114 def auth(self) -> AuthResource: 115 """Password auth, OAuth identities, API tokens, sign-in discovery.""" 116 return AuthResource(self)
Password auth, OAuth identities, API tokens, sign-in discovery.
118 @cached_property 119 def achievements(self) -> AchievementsResource: 120 """Public badge catalog, recent-unlock feed, easter eggs.""" 121 return AchievementsResource(self)
Public badge catalog, recent-unlock feed, easter eggs.
123 @cached_property 124 def challenges(self) -> ChallengesResource: 125 """Global challenge catalog, attachments, facets, feedback chips.""" 126 return ChallengesResource(self)
Global challenge catalog, attachments, facets, feedback chips.
128 @cached_property 129 def competitions(self) -> CompetitionsResource: 130 """Discover competitions (``list`` / ``get``). To act within one, use 131 :meth:`competition`.""" 132 return CompetitionsResource(self)
Discover competitions (list / get). To act within one, use
competition().
134 def competition(self, competition_id: str) -> Competition: 135 """Scope to one competition. The inherently competition-scoped 136 operations — register, your team (captain tooling + invites + 137 join-requests), the comp's challenges, team search, and standings — 138 hang off the returned :class:`~ctfy.sdk.competition.Competition` 139 handle, so ``competition_id`` is named once instead of on every call. 140 (Instance lifecycle and answer submission stay on :attr:`instances` / 141 :attr:`submissions`, keyed by ``instance_id``.)""" 142 return Competition(self, competition_id)
Scope to one competition. The inherently competition-scoped
operations — register, your team (captain tooling + invites +
join-requests), the comp's challenges, team search, and standings —
hang off the returned ~ctfy.sdk.competition.Competition
handle, so competition_id is named once instead of on every call.
(Instance lifecycle and answer submission stay on instances /
submissions, keyed by instance_id.)
144 @cached_property 145 def instances(self) -> InstancesResource: 146 """Launch / control / inspect challenge instances by instance id 147 (the competition is inferred server-side).""" 148 return InstancesResource(self)
Launch / control / inspect challenge instances by instance id (the competition is inferred server-side).
150 @cached_property 151 def submissions(self) -> SubmissionsResource: 152 """Submit / verify answers + list submissions (by instance id), plus 153 the instance-less QA challenges.""" 154 return SubmissionsResource(self)
Submit / verify answers + list submissions (by instance id), plus the instance-less QA challenges.
156 @cached_property 157 def scoreboard(self) -> ScoreboardResource: 158 """Global standings, per-challenge stats, persisted snapshots.""" 159 return ScoreboardResource(self)
Global standings, per-challenge stats, persisted snapshots.
161 @cached_property 162 def activities(self) -> ActivitiesResource: 163 """Platform activity log + histogram.""" 164 return ActivitiesResource(self)
Platform activity log + histogram.
166 @cached_property 167 def nodes(self) -> NodesResource: 168 """Public worker-node list (operator node mgmt is on ``admin.nodes``).""" 169 return NodesResource(self)
Public worker-node list (operator node mgmt is on admin.nodes).
171 @cached_property 172 def admin(self) -> AdminClient: 173 """Admin / operator surface (separate, role-gated server-side). 174 175 Shares this client's transport. See :class:`~ctfy.sdk.admin.AdminClient`. 176 """ 177 return AdminClient(self)
Admin / operator surface (separate, role-gated server-side).
Shares this client's transport. See ~ctfy.sdk.admin.AdminClient.
186 def get_meta(self) -> MetaResponse: 187 """Server identity + challenge repo SHA + build version. 188 Admin tokens additionally see cluster capacity + team / solve 189 counts in the same payload.""" 190 resp = self.request("GET", "/meta") 191 _raise_for_status(resp) 192 return MetaResponse.model_validate(resp.json())
Server identity + challenge repo SHA + build version. Admin tokens additionally see cluster capacity + team / solve counts in the same payload.
194 def cluster_info(self) -> ClusterInfo: 195 """Aggregate worker-node capacity / utilisation (the public 196 capacity banner; no per-node detail).""" 197 resp = self.request("GET", "/cluster-info") 198 _raise_for_status(resp) 199 return ClusterInfo.model_validate(resp.json())
Aggregate worker-node capacity / utilisation (the public capacity banner; no per-node detail).
201 def check_server_compatibility(self, *, health: HealthResponse | None = None) -> VersionCheck: 202 """Compare this client's ``ctfy`` version against the server's. 203 204 Uses the cheap unauthenticated ``/health`` probe (pass an 205 already-fetched :class:`HealthResponse` to avoid a second round 206 trip). Pure classification — never prints, never raises on a 207 mismatch, never mutates anything. The caller (CLI, MCP, harness) 208 decides what to do with :class:`~ctfy.core.version.VersionCheck` 209 (typically: print ``.message`` to stderr when not ``.quiet``). 210 211 A server too old to report its version yields 212 :attr:`Compatibility.UNKNOWN` (``health.version == ""``), which 213 is ``.quiet`` — so this stays silent against legacy servers 214 rather than crying wolf. 215 """ 216 h = health if health is not None else self.health() 217 return check_compatibility(package_version(), h.version)
Compare this client's ctfy version against the server's.
Uses the cheap unauthenticated /health probe (pass an
already-fetched HealthResponse to avoid a second round
trip). Pure classification — never prints, never raises on a
mismatch, never mutates anything. The caller (CLI, MCP, harness)
decides what to do with ~ctfy.core.version.VersionCheck
(typically: print .message to stderr when not .quiet).
A server too old to report its version yields
Compatibility.UNKNOWN (health.version == ""), which
is .quiet — so this stays silent against legacy servers
rather than crying wolf.
219 def events(self, *, auto_reconnect: bool = True) -> EventStream: 220 """Open the platform SSE event stream. 221 222 Yields ``{"event": <name>, "data": <dict>}`` for each frame. 223 Filters: team-scoped events for every team the caller's user is 224 on (across every per-comp competition), plus all global events; 225 admin tokens additionally see admin-only frames. 226 227 Usage:: 228 229 with client.events() as stream: 230 for event in stream: 231 if event["event"] == "solve": 232 ... 233 234 Auto-reconnects on transient network errors with exponential 235 backoff (1s → 30s capped). Set ``auto_reconnect=False`` to make 236 the iterator raise instead. 237 """ 238 token = self._token 239 240 def _factory() -> httpx.Client: 241 # New client per session so a reconnect after a stale 242 # connection drops fresh sockets, not warmed-over ones. 243 return httpx.Client(base_url=self._base_url, timeout=None) 244 245 return EventStream( 246 client_factory=_factory, 247 path="/events", 248 token=token, 249 auto_reconnect=auto_reconnect, 250 )
Open the platform SSE event stream.
Yields {"event": <name>, "data": <dict>} for each frame.
Filters: team-scoped events for every team the caller's user is
on (across every per-comp competition), plus all global events;
admin tokens additionally see admin-only frames.
Usage::
with client.events() as stream:
for event in stream:
if event["event"] == "solve":
...
Auto-reconnects on transient network errors with exponential
backoff (1s → 30s capped). Set auto_reconnect=False to make
the iterator raise instead.